Skip to content

handle subgraph errors #1778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wise-panthers-collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'backend': patch
---

handle subgraph errors
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export const syncBptBalancesFromRpc = async (
client: ViemClient,
chain: Chain,
syncCategory: 'BPT_BALANCES_V2' | 'BPT_BALANCES_V3' | 'BPT_BALANCES_COW_AMM',
syncStore = true, // Set false when syncing should be repeated, eg when passing subset of poolIds just for new pools
) => {
// Must have poolIds to sync
if (poolIds.length === 0) {
Expand All @@ -27,7 +28,11 @@ export const syncBptBalancesFromRpc = async (
const endBlock = await client.getBlockNumber().then(Number);

// Get the balances synced block from the DB
const startBlock = await getLastSyncedBlock(chain, syncCategory);
let startBlock = await getLastSyncedBlock(chain, syncCategory);
if (!syncStore) {
// try to get all events for these poolIds
startBlock = endBlock - config[chain].rpcMaxBlockRange + 5;
}

// Don't use RPC when balances weren't synced at all
if (startBlock === 0) {
Expand Down Expand Up @@ -108,7 +113,7 @@ export const syncBptBalancesFromRpc = async (

console.log(`syncBptBalancesFromRpc ${syncCategory} on ${chain} got ${poolShares.length} poolShares`);

const operations = balancesToDb(poolShares, endBlock, syncCategory);
const operations = balancesToDb(poolShares, endBlock, syncStore ? syncCategory : undefined);
try {
await prisma.$transaction(operations);
} catch (e: any) {
Expand Down
14 changes: 10 additions & 4 deletions modules/controllers/cow-amm-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { syncBptBalancesFromSubgraph } from '../actions/user/bpt-balances/helper
import { getLastSyncedBlock, upsertLastSyncedBlock } from '../actions/last-synced-block';
import { updateLifetimeValues } from '../actions/pool/update-liftetime-values';
import { syncTokenPairs } from '../actions/pool/v3/sync-tokenpairs';
import { handleSubgraphErrors } from './error-handling';
import { syncBptBalancesFromRpc } from '../actions/user/bpt-balances/helpers/sync-bpt-balances-from-rpc';

export function CowAmmController(tracer?: any) {
const getSubgraphClient = (chain: Chain) => {
Expand Down Expand Up @@ -104,15 +106,19 @@ export function CowAmmController(tracer?: any) {
await updateSurplusAPRs(chain, ids);
// Sync balances for the pools
const newIds = ids.filter((id) => !existingIds.includes(id));
await syncBptBalancesFromSubgraph(newIds, subgraphClient, chain);
if (useSubgraph) {
await syncBptBalancesFromSubgraph(newIds, subgraphClient, chain);
} else {
await syncBptBalancesFromRpc(newIds, viemClient, chain, 'BPT_BALANCES_COW_AMM', false);
}

await upsertLastSyncedBlock(chain, PrismaLastBlockSyncedCategory.COW_AMM_POOLS, toBlock);

return ids;
},
async syncSnapshots(chain: Chain) {
const subgraphClient = getSubgraphClient(chain);
const ids = await syncSnapshots(subgraphClient, 'SNAPSHOTS_COW_AMM', chain);
const ids = await handleSubgraphErrors(() => syncSnapshots(subgraphClient, 'SNAPSHOTS_COW_AMM', chain));
// update lifetime values based on snapshots
await updateLifetimeValues(chain, undefined, 'COW_AMM');
return ids;
Expand All @@ -127,12 +133,12 @@ export function CowAmmController(tracer?: any) {
},
async syncJoinExits(chain: Chain) {
const subgraphClient = getSubgraphClient(chain);
const entries = await syncJoinExits(subgraphClient, chain);
const entries = await handleSubgraphErrors(() => syncJoinExits(subgraphClient, chain));
return entries;
},
async syncSwaps(chain: Chain) {
const subgraphClient = getSubgraphClient(chain);
const swaps = await syncSwaps(subgraphClient, chain);
const swaps = await handleSubgraphErrors(() => syncSwaps(subgraphClient, chain));
const poolIds = swaps
.map((event) => event.poolId)
.filter((value, index, self) => self.indexOf(value) === index);
Expand Down
14 changes: 14 additions & 0 deletions modules/controllers/error-handling.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export async function handleSubgraphErrors<T>(operation: () => Promise<T>): Promise<T | []> {
try {
return await operation();
} catch (error: any) {
if (
error.message.includes('Too many requests') ||
error.message.includes('bad indexers') ||
error.message.includes('Bad Gateway')
Comment on lines +6 to +8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should let these errors still throw. We still need to know if crons fail because of it. I dont think its enough to just rely on the subgraph-lag alerts

) {
return [];
}
throw error;
}
}
29 changes: 15 additions & 14 deletions modules/controllers/event-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { syncSwaps as syncSwapsV3 } from '../actions/pool/v3/sync-swaps';
import { Chain } from '@prisma/client';
import { updateVolumeAndFees } from '../actions/pool/update-volume-and-fees';
import { getVaultSubgraphClient } from '../sources/subgraphs/balancer-v3-vault';
import { handleSubgraphErrors } from './error-handling';

export function EventController() {
return {
Expand All @@ -23,8 +24,7 @@ export function EventController() {
}

const subgraphClient = new BalancerSubgraphService(balancer, chain);
const entries = await syncJoinExitsV2(subgraphClient, chain);
return entries;
return handleSubgraphErrors(() => syncJoinExitsV2(subgraphClient, chain));
},
async syncSwapsUpdateVolumeAndFeesV2(chain: Chain) {
const {
Expand All @@ -37,11 +37,12 @@ export function EventController() {
}

const subgraphClient = getV2SubgraphClient(balancer, chain);
const poolsWithNewSwaps = await syncSwapsV2(subgraphClient, chain);
await syncSwapsForLast48Hours(subgraphClient, chain);
await updateVolumeAndFees(chain, poolsWithNewSwaps);

return poolsWithNewSwaps;
return handleSubgraphErrors(async () => {
const poolsWithNewSwaps = await syncSwapsV2(subgraphClient, chain);
await syncSwapsForLast48Hours(subgraphClient, chain);
await updateVolumeAndFees(chain, poolsWithNewSwaps);
return poolsWithNewSwaps;
});
},
async syncJoinExitsV3(chain: Chain) {
const {
Expand All @@ -54,8 +55,7 @@ export function EventController() {
}

const vaultSubgraphClient = getVaultSubgraphClient(balancerV3, chain);
const entries = await syncJoinExitsV3(vaultSubgraphClient, chain);
return entries;
return handleSubgraphErrors(() => syncJoinExitsV3(vaultSubgraphClient, chain));
},
async syncSwapsV3(chain: Chain) {
const {
Expand All @@ -68,8 +68,7 @@ export function EventController() {
}

const vaultSubgraphClient = getVaultSubgraphClient(balancerV3, chain);
const entries = await syncSwapsV3(vaultSubgraphClient, chain);
return entries;
return handleSubgraphErrors(() => syncSwapsV3(vaultSubgraphClient, chain));
},
async syncSwapsUpdateVolumeAndFeesV3(chain: Chain) {
const {
Expand All @@ -83,9 +82,11 @@ export function EventController() {

const vaultSubgraphClient = getVaultSubgraphClient(balancerV3, chain);

const poolsWithNewSwaps = await syncSwapsV3(vaultSubgraphClient, chain);
await updateVolumeAndFees(chain, poolsWithNewSwaps);
return poolsWithNewSwaps;
return handleSubgraphErrors(async () => {
const poolsWithNewSwaps = await syncSwapsV3(vaultSubgraphClient, chain);
await updateVolumeAndFees(chain, poolsWithNewSwaps);
return poolsWithNewSwaps;
});
},
};
}
3 changes: 2 additions & 1 deletion modules/controllers/pool-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import { syncHookReviews } from '../actions/content/sync-hook-reviews';
import { HookData } from '../sources/transformers';
import { syncErc4626Tokens } from '../actions/token/sync-erc4626-tokens';
import { syncRateProviderReviews } from '../actions/content/sync-rate-provider-reviews';
import { handleSubgraphErrors } from './error-handling';

export function PoolController(tracer?: any) {
return {
async addPoolsV2(chain: Chain) {
const subgraphUrl = config[chain].subgraphs.balancer;
const subgraphService = getV2SubgraphClient(subgraphUrl, chain);

return addPoolsV2(subgraphService, chain);
return handleSubgraphErrors(() => addPoolsV2(subgraphService, chain));
},

async syncOnchainDataForAllPoolsV2(chain: Chain) {
Expand Down
6 changes: 5 additions & 1 deletion tasks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ async function run(job: string = process.argv[2], chainId: string = process.argv
} else if (job === 'sync-join-exits-v3') {
return EventController().syncJoinExitsV3(chain);
} else if (job === 'sync-join-exits-v2') {
return EventController().syncJoinExitsV2(chain);
// make a loop to hit the rate limit
while (true) {
await EventController().syncJoinExitsV2(chain);
}
// return EventController().syncJoinExitsV2(chain);
} else if (job === 'sync-swaps-v2') {
return EventController().syncSwapsUpdateVolumeAndFeesV2(chain);
} else if (job === 'sync-snapshots-v2') {
Expand Down