Skip to content

Commit 15acb65

Browse files
feat: assign spoke pool events to bundles (#77)
1 parent 8ba51e5 commit 15acb65

File tree

13 files changed

+578
-73
lines changed

13 files changed

+578
-73
lines changed

packages/indexer-database/src/entities/Bundle.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { RootBundleCanceled } from "./evm/RootBundleCanceled";
1313
import { RootBundleExecuted } from "./evm/RootBundleExecuted";
1414
import { RootBundleDisputed } from "./evm/RootBundleDisputed";
1515
import { BundleBlockRange } from "./BundleBlockRange";
16+
import { BundleEvent } from "./BundleEvent";
1617

1718
export enum BundleStatus {
1819
Proposed = "Proposed",
@@ -89,4 +90,10 @@ export class Bundle {
8990
nullable: false,
9091
})
9192
ranges: BundleBlockRange[];
93+
94+
@Column({ default: false })
95+
eventsAssociated: boolean;
96+
97+
@OneToMany(() => BundleEvent, (event) => event.bundle)
98+
events: BundleEvent[];
9299
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import {
2+
Column,
3+
Entity,
4+
ManyToOne,
5+
PrimaryGeneratedColumn,
6+
Unique,
7+
} from "typeorm";
8+
import { Bundle } from "./Bundle";
9+
10+
export enum BundleEventType {
11+
Deposit = "deposit",
12+
ExpiredDeposit = "expiredDeposit",
13+
Fill = "fill",
14+
SlowFill = "slowFill",
15+
UnexecutableSlowFill = "unexecutableSlowFill",
16+
}
17+
18+
@Entity()
19+
@Unique("UK_bundleEvent_eventType_relayHash", ["type", "relayHash"])
20+
export class BundleEvent {
21+
@PrimaryGeneratedColumn()
22+
id: number;
23+
24+
@ManyToOne(() => Bundle, (bundle) => bundle.events)
25+
bundle: Bundle;
26+
27+
@Column()
28+
bundleId: number;
29+
30+
@Column({ type: "enum", enum: BundleEventType })
31+
type: BundleEventType;
32+
33+
@Column()
34+
relayHash: string;
35+
36+
@Column({ nullable: true })
37+
repaymentChainId: number;
38+
}

packages/indexer-database/src/entities/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export * from "./evm/TokensBridged";
1515

1616
// Others
1717
export * from "./Bundle";
18+
export * from "./BundleEvent";
1819
export * from "./BundleBlockRange";
1920
export * from "./RootBundleExecutedJoinTable";
2021
export * from "./RelayHashInfo";

packages/indexer-database/src/main.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export const createDataSource = (config: DatabaseConfig): DataSource => {
3939
// Bundle
4040
entities.Bundle,
4141
entities.BundleBlockRange,
42+
entities.BundleEvent,
4243
entities.RootBundleExecutedJoinTable,
4344
// Others
4445
entities.RelayHashInfo,
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { MigrationInterface, QueryRunner } from "typeorm";
2+
3+
export class BundleEvent1729644581993 implements MigrationInterface {
4+
name = "BundleEvent1729644581993";
5+
6+
public async up(queryRunner: QueryRunner): Promise<void> {
7+
await queryRunner.query(
8+
`CREATE TYPE "public"."bundle_event_type_enum" AS ENUM('deposit', 'expiredDeposit', 'fill', 'slowFill', 'unexecutableSlowFill')`,
9+
);
10+
await queryRunner.query(
11+
`CREATE TABLE "bundle_event" (
12+
"id" SERIAL NOT NULL,
13+
"bundleId" integer NOT NULL,
14+
"type" "public"."bundle_event_type_enum" NOT NULL,
15+
"relayHash" character varying NOT NULL,
16+
"repaymentChainId" integer,
17+
CONSTRAINT "UK_bundleEvent_eventType_relayHash" UNIQUE ("type", "relayHash"),
18+
CONSTRAINT "PK_d633122fa4b52768e1b588bddee" PRIMARY KEY ("id"))`,
19+
);
20+
await queryRunner.query(
21+
`ALTER TABLE "bundle" ADD "eventsAssociated" boolean NOT NULL DEFAULT false`,
22+
);
23+
await queryRunner.query(
24+
`ALTER TABLE "bundle_event" ADD CONSTRAINT "FK_62dcd4f6f0d1713fab0c8542dba" FOREIGN KEY ("bundleId") REFERENCES "bundle"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`,
25+
);
26+
}
27+
28+
public async down(queryRunner: QueryRunner): Promise<void> {
29+
await queryRunner.query(
30+
`ALTER TABLE "bundle_event" DROP CONSTRAINT "FK_62dcd4f6f0d1713fab0c8542dba"`,
31+
);
32+
await queryRunner.query(
33+
`ALTER TABLE "bundle" DROP COLUMN "eventsAssociated"`,
34+
);
35+
await queryRunner.query(`DROP TABLE "bundle_event"`);
36+
await queryRunner.query(`DROP TYPE "public"."bundle_event_type_enum"`);
37+
}
38+
}

packages/indexer/src/database/BundleRepository.ts

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import winston from "winston";
2+
import * as across from "@across-protocol/sdk";
23
import { DataSource, entities, utils } from "@repo/indexer-database";
34

45
export type BlockRangeInsertType = {
@@ -32,6 +33,7 @@ export class BundleRepository extends utils.BaseRepository {
3233
postgres: DataSource,
3334
logger: winston.Logger,
3435
throwError?: boolean,
36+
private chunkSize = 2000,
3537
) {
3638
super(postgres, logger, throwError);
3739
}
@@ -371,4 +373,182 @@ export class BundleRepository extends utils.BaseRepository {
371373

372374
return (await executedUpdateQuery.execute())?.affected ?? 0;
373375
}
376+
377+
/**
378+
* Retrieves executed bundles that do not have events associated with them.
379+
* The query can be filtered by the block number and a limit on the number of results returned.
380+
* @param filters - Optional filters for the query.
381+
* @param filters.fromBlock - If provided, retrieves bundles where the proposal's block number is greater than this value.
382+
* @param limit - The maximum number of bundles to retrieve.
383+
* @returns An array of bundles that match the given criteria.
384+
*/
385+
public async getExecutedBundlesWithoutEventsAssociated(
386+
filters: {
387+
fromBlock?: number;
388+
},
389+
limit = 5,
390+
): Promise<entities.Bundle[]> {
391+
const bundleRepo = this.postgres.getRepository(entities.Bundle);
392+
const query = bundleRepo
393+
.createQueryBuilder("b")
394+
.select(["b", "proposal", "ranges"])
395+
.leftJoinAndSelect("b.ranges", "ranges")
396+
.leftJoinAndSelect("b.proposal", "proposal")
397+
.where("b.status = :executed", {
398+
executed: entities.BundleStatus.Executed,
399+
})
400+
.andWhere("b.eventsAssociated = false");
401+
if (filters.fromBlock) {
402+
query.andWhere("proposal.blockNumber > :fromBlock", {
403+
fromBlock: filters.fromBlock,
404+
});
405+
}
406+
return query.orderBy("proposal.blockNumber", "DESC").take(limit).getMany();
407+
}
408+
409+
/**
410+
* Updates the `eventsAssociated` flag to `true` for a specific bundle.
411+
* @param bundleId - The ID of the bundle to update.
412+
* @returns A promise that resolves when the update is complete.
413+
*/
414+
public async updateBundleEventsAssociatedFlag(bundleId: number) {
415+
const bundleRepo = this.postgres.getRepository(entities.Bundle);
416+
const updatedBundle = await bundleRepo
417+
.createQueryBuilder()
418+
.update()
419+
.set({ eventsAssociated: true })
420+
.where("id = :id", { id: bundleId })
421+
.execute();
422+
return updatedBundle.affected;
423+
}
424+
425+
/**
426+
* Stores bundle events relating them to a given bundle.
427+
* @param bundleData The reconstructed bundle data.
428+
* @param bundleId ID of the bundle to associate these events with.
429+
* @returns A promise that resolves when all the events have been inserted into the database.
430+
*/
431+
public async storeBundleEvents(
432+
bundleData: across.interfaces.LoadDataReturnValue,
433+
bundleId: number,
434+
) {
435+
const eventsRepo = this.postgres.getRepository(entities.BundleEvent);
436+
437+
// Store bundle deposits
438+
const deposits = this.formatBundleEvents(
439+
entities.BundleEventType.Deposit,
440+
bundleData.bundleDepositsV3,
441+
bundleId,
442+
);
443+
const chunkedDeposits = across.utils.chunk(deposits, this.chunkSize);
444+
await Promise.all(
445+
chunkedDeposits.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
446+
);
447+
448+
// Store bundle refunded deposits
449+
const expiredDeposits = this.formatBundleEvents(
450+
entities.BundleEventType.ExpiredDeposit,
451+
bundleData.expiredDepositsToRefundV3,
452+
bundleId,
453+
);
454+
const chunkedRefunds = across.utils.chunk(expiredDeposits, this.chunkSize);
455+
await Promise.all(
456+
chunkedRefunds.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
457+
);
458+
459+
// Store bundle slow fills
460+
const slowFills = this.formatBundleEvents(
461+
entities.BundleEventType.SlowFill,
462+
bundleData.bundleSlowFillsV3,
463+
bundleId,
464+
);
465+
const chunkedSlowFills = across.utils.chunk(slowFills, this.chunkSize);
466+
await Promise.all(
467+
chunkedSlowFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
468+
);
469+
470+
// Store bundle unexecutable slow fills
471+
const unexecutableSlowFills = this.formatBundleEvents(
472+
entities.BundleEventType.UnexecutableSlowFill,
473+
bundleData.unexecutableSlowFills,
474+
bundleId,
475+
);
476+
const chunkedUnexecutableSlowFills = across.utils.chunk(
477+
unexecutableSlowFills,
478+
this.chunkSize,
479+
);
480+
await Promise.all(
481+
chunkedUnexecutableSlowFills.map((eventsChunk) =>
482+
eventsRepo.insert(eventsChunk),
483+
),
484+
);
485+
486+
// Store bundle fills
487+
const fills = this.formatBundleFillEvents(
488+
entities.BundleEventType.Fill,
489+
bundleData.bundleFillsV3,
490+
bundleId,
491+
);
492+
const chunkedFills = across.utils.chunk(fills, this.chunkSize);
493+
await Promise.all(
494+
chunkedFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)),
495+
);
496+
497+
return {
498+
deposits: deposits.length,
499+
expiredDeposits: expiredDeposits.length,
500+
slowFills: slowFills.length,
501+
unexecutableSlowFills: unexecutableSlowFills.length,
502+
fills: fills.length,
503+
};
504+
}
505+
506+
private formatBundleEvents(
507+
eventsType: entities.BundleEventType,
508+
bundleEvents:
509+
| across.interfaces.BundleDepositsV3
510+
| across.interfaces.BundleSlowFills
511+
| across.interfaces.BundleExcessSlowFills
512+
| across.interfaces.ExpiredDepositsToRefundV3,
513+
bundleId: number,
514+
): {
515+
bundleId: number;
516+
relayHash: string;
517+
type: entities.BundleEventType;
518+
}[] {
519+
return Object.values(bundleEvents).flatMap((tokenEvents) =>
520+
Object.values(tokenEvents).flatMap((events) =>
521+
events.map((event) => {
522+
return {
523+
bundleId,
524+
relayHash: across.utils.getRelayHashFromEvent(event),
525+
type: eventsType,
526+
};
527+
}),
528+
),
529+
);
530+
}
531+
532+
private formatBundleFillEvents(
533+
eventsType: entities.BundleEventType.Fill,
534+
bundleEvents: across.interfaces.BundleFillsV3,
535+
bundleId: number,
536+
): {
537+
bundleId: number;
538+
relayHash: string;
539+
type: entities.BundleEventType.Fill;
540+
}[] {
541+
return Object.entries(bundleEvents).flatMap(([chainId, tokenEvents]) =>
542+
Object.values(tokenEvents).flatMap((fillsData) =>
543+
fillsData.fills.map((event) => {
544+
return {
545+
bundleId,
546+
relayHash: across.utils.getRelayHashFromEvent(event),
547+
type: eventsType,
548+
repaymentChainId: Number(chainId),
549+
};
550+
}),
551+
),
552+
);
553+
}
374554
}

packages/indexer/src/main.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
8383
logger,
8484
redis,
8585
postgres,
86+
hubPoolClientFactory,
87+
spokePoolClientFactory,
8688
});
8789

8890
const spokePoolIndexers = spokePoolChainsEnabled.map((chainId) => {

packages/indexer/src/services/BundleBuilderService.ts

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { BundleLeavesCache } from "../redis/bundleLeavesCache";
99
import { HubPoolBalanceCache } from "../redis/hubBalancesCache";
1010
import {
1111
BN_ZERO,
12+
buildPoolRebalanceRoot,
1213
ConfigStoreClientFactory,
1314
convertProposalRangeResultToProposalRange,
1415
getBlockRangeBetweenBundles,
@@ -398,30 +399,17 @@ export class BundleBuilderService extends BaseIndexer {
398399
chainsToBuildBundleFor,
399400
);
400401
// Load the bundle data
401-
const {
402-
bundleDepositsV3,
403-
expiredDepositsToRefundV3,
404-
bundleFillsV3,
405-
unexecutableSlowFills,
406-
bundleSlowFillsV3,
407-
} = await bundleDataClient.loadData(
402+
const bundleData = await bundleDataClient.loadData(
408403
bundleRangeForBundleClient,
409404
spokeClients,
410405
false,
411406
);
412407
// Build pool rebalance root and resolve the leaves
413-
const { leaves } = clients.BundleDataClient._buildPoolRebalanceRoot(
414-
bundleRangeForBundleClient[0]![1]!, // Mainnet is always the first chain. Second element is the end block
415-
bundleRangeForBundleClient[0]![1]!, // Mainnet is always the first chain. Second element is the end block
416-
bundleDepositsV3,
417-
bundleFillsV3,
418-
bundleSlowFillsV3,
419-
unexecutableSlowFills,
420-
expiredDepositsToRefundV3,
421-
{
422-
hubPoolClient,
423-
configStoreClient,
424-
},
408+
const { leaves } = buildPoolRebalanceRoot(
409+
bundleRangeForBundleClient,
410+
bundleData,
411+
hubPoolClient,
412+
configStoreClient,
425413
);
426414
// Map the leaves to the desired format
427415
return leaves.map((leaf) => ({

0 commit comments

Comments
 (0)