Skip to content

Commit bb2a4aa

Browse files
committed
Integrate pubsub lib
1 parent 8b196a5 commit bb2a4aa

File tree

9 files changed

+426
-137
lines changed

9 files changed

+426
-137
lines changed

packages/indexer-database/src/entities/CctpFinalizerJob.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ export class CctpFinalizerJob {
2424
@Column()
2525
attestation: string;
2626

27+
@Column()
28+
message: string;
29+
2730
@Column()
2831
burnEventId: number;
2932

packages/indexer-database/src/migrations/1760541259411-CctpFinalizerJob.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export class CctpFinalizerJob1760541259411 implements MigrationInterface {
88
CREATE TABLE "cctp_finalizer_job" (
99
"id" SERIAL NOT NULL,
1010
"attestation" character varying NOT NULL,
11+
"message" character varying NOT NULL,
1112
"burnEventId" integer NOT NULL,
1213
"createdAt" TIMESTAMP NOT NULL DEFAULT now(),
1314
"updatedAt" TIMESTAMP NOT NULL DEFAULT now(),

packages/indexer/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"@across-protocol/constants": "^3.1.79",
2525
"@across-protocol/contracts": "^4.1.9",
2626
"@across-protocol/sdk": "^4.3.67",
27+
"@google-cloud/pubsub": "^5.2.0",
2728
"@repo/error-handling": "workspace:*",
2829
"@repo/webhooks": "workspace:*",
2930
"@solana/kit": "^2.1.0",

packages/indexer/src/data-indexing/adapter/cctp-v2/service.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { ethers } from "ethers";
2+
import axios from "axios";
23
import {
34
CCTP_NO_DOMAIN,
45
CHAIN_IDs,
56
PRODUCTION_NETWORKS,
7+
PUBLIC_NETWORKS,
68
TEST_NETWORKS,
79
} from "@across-protocol/constants";
810
import * as across from "@across-protocol/sdk";
@@ -81,6 +83,19 @@ export function decodeMessage(message: string, isSvm = false) {
8183
};
8284
}
8385

86+
/**
87+
* @notice Returns the CCTP domain for a given chain ID. Throws if the chain ID is not a CCTP domain.
88+
* @param chainId
89+
* @returns CCTP Domain ID
90+
*/
91+
export function getCctpDomainForChainId(chainId: number): number {
92+
const cctpDomain = PUBLIC_NETWORKS[chainId]?.cctpDomain;
93+
if (!across.utils.isDefined(cctpDomain) || cctpDomain === CCTP_NO_DOMAIN) {
94+
throw new Error(`No CCTP domain found for chainId: ${chainId}`);
95+
}
96+
return cctpDomain;
97+
}
98+
8499
export function getCctpDestinationChainFromDomain(
85100
domain: number,
86101
productionNetworks: boolean = true,
@@ -111,3 +126,27 @@ export function getCctpDestinationChainFromDomain(
111126
}
112127
return parseInt(chainId);
113128
}
129+
130+
export type CCTPV2APIGetAttestationResponse = {
131+
messages: CCTPV2APIAttestation[];
132+
};
133+
134+
export type CCTPV2APIAttestation = {
135+
eventNonce: string;
136+
status: string;
137+
attestation: string;
138+
message: string;
139+
};
140+
141+
export async function fetchAttestationsForTxn(
142+
sourceDomainId: number,
143+
transactionHash: string,
144+
isMainnet: boolean,
145+
): Promise<CCTPV2APIGetAttestationResponse> {
146+
const httpResponse = await axios.get<CCTPV2APIGetAttestationResponse>(
147+
`https://iris-api${
148+
isMainnet ? "" : "-sandbox"
149+
}.circle.com/v2/messages/${sourceDomainId}?transactionHash=${transactionHash}`,
150+
);
151+
return httpResponse.data;
152+
}

packages/indexer/src/data-indexing/service/CCTPIndexerManager.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { RetryProvidersFactory } from "../../web3/RetryProvidersFactory";
1414
import { CCTPIndexerDataHandler } from "./CCTPIndexerDataHandler";
1515
import { CCTPRepository } from "../../database/CctpRepository";
1616

17-
const MAX_BLOCK_RANGE_SIZE = 10000;
17+
const MAX_BLOCK_RANGE_SIZE = 30000;
1818

1919
export class CCTPIndexerManager {
2020
private evmIndexer?: Indexer;
@@ -69,7 +69,8 @@ export class CCTPIndexerManager {
6969
);
7070
const indexer = new EvmIndexer(
7171
{
72-
indexingDelaySeconds: getIndexingDelaySeconds(chainId, this.config),
72+
indexingDelaySeconds:
73+
getIndexingDelaySeconds(chainId, this.config) * 2,
7374
finalisedBlockBufferDistance:
7475
getFinalisedBlockBufferDistance(chainId),
7576
maxBlockRangeSize: MAX_BLOCK_RANGE_SIZE,

packages/indexer/src/data-indexing/service/CctpFinalizerService.ts

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@ import winston, { Logger } from "winston";
22
import axios from "axios";
33

44
import { RepeatableTask } from "../../generics";
5-
import { DataSource } from "@repo/indexer-database";
5+
import { DataSource, entities } from "@repo/indexer-database";
66
import {
77
CctpFinalizerJob,
88
DepositForBurn,
99
} from "../../../../indexer-database/dist/src/entities";
1010
import { CHAIN_IDs } from "@across-protocol/constants";
1111
import { PubSubService } from "../../pubsub/service";
1212
import { Config } from "../../parseEnv";
13+
import {
14+
fetchAttestationsForTxn,
15+
getCctpDestinationChainFromDomain,
16+
getCctpDomainForChainId,
17+
} from "../adapter/cctp-v2/service";
1318

1419
export const CCTP_FINALIZER_DELAY_SECONDS = 10;
1520

@@ -136,6 +141,43 @@ class CctpFinalizerService extends RepeatableTask {
136141
});
137142
return;
138143
}
144+
const attestations = await fetchAttestationsForTxn(
145+
getCctpDomainForChainId(Number(burnEvent.chainId)),
146+
transactionHash,
147+
true,
148+
);
149+
if (attestations.messages.length === 0) {
150+
this.logger.debug({
151+
at: "CctpFinalizerService#publishBurnEvent",
152+
message: "No attestations found for burn event",
153+
chainId,
154+
transactionHash,
155+
burnEvent,
156+
});
157+
return;
158+
}
159+
const { attestation, eventNonce, message, status } =
160+
attestations.messages[0]!;
161+
if (status !== "complete") {
162+
this.logger.debug({
163+
at: "CctpFinalizerService#publishBurnEvent",
164+
message: "Attestation is not complete",
165+
chainId,
166+
transactionHash,
167+
burnEvent,
168+
attestations,
169+
});
170+
return;
171+
}
172+
173+
await this.postgres
174+
.createQueryBuilder(entities.MessageSent, "ms")
175+
.update()
176+
.set({
177+
nonce: eventNonce,
178+
})
179+
.where("id = :id", { id: burnEvent.id })
180+
.execute();
139181
this.logger.debug({
140182
at: "CctpFinalizerService#publishBurnEvent",
141183
message: "Publishing burn event to pubsub",
@@ -146,16 +188,23 @@ class CctpFinalizerService extends RepeatableTask {
146188
attestationTimeSeconds,
147189
elapsedSeconds,
148190
});
191+
const destinationChainId = getCctpDestinationChainFromDomain(
192+
burnEvent.destinationDomain,
193+
);
149194
await this.pubSubService.publishCctpFinalizerMessage(
150195
transactionHash,
151196
Number(chainId),
197+
message,
198+
attestation,
199+
destinationChainId,
152200
);
153201

154202
await this.postgres
155203
.createQueryBuilder(CctpFinalizerJob, "j")
156204
.insert()
157205
.values({
158-
attestation: "",
206+
attestation,
207+
message,
159208
burnEventId: burnEvent.id,
160209
})
161210
.orUpdate(["attestation"], ["burnEventId"])

packages/indexer/src/parseEnv.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ export type Config = {
2121
enableBundleBuilder: boolean;
2222
enableCctpIndexer: boolean;
2323
enableCctpFinalizer: boolean;
24-
finalizerPubSubTopic: string;
24+
pubSubCctpFinalizerTopic: string;
25+
pubSubGcpProjectId: string;
2526
enableOftIndexer: boolean;
2627
webhookConfig: WebhooksConfig;
2728
maxBlockRangeSize?: number;
@@ -226,7 +227,8 @@ export function envToConfig(env: Env): Config {
226227
const enableCctpFinalizer = env.ENABLE_CCTP_FINALIZER
227228
? env.ENABLE_CCTP_FINALIZER === "true"
228229
: false;
229-
const finalizerPubSubTopic = env.CCTP_FINALIZER_PUBSUB_TOPIC ?? "";
230+
const pubSubCctpFinalizerTopic = env.PUBSUB_CCTP_FINALIZER_TOPIC ?? "";
231+
const pubSubGcpProjectId = env.PUBSUB_GCP_PROJECT_ID ?? "";
230232
const enableBundleIncludedEventsService =
231233
env.ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE
232234
? env.ENABLE_BUNDLE_INCLUDED_EVENTS_SERVICE === "true"
@@ -286,7 +288,8 @@ export function envToConfig(env: Env): Config {
286288
enableCctpIndexer,
287289
enableOftIndexer,
288290
enableCctpFinalizer,
289-
finalizerPubSubTopic,
291+
pubSubCctpFinalizerTopic,
292+
pubSubGcpProjectId,
290293
webhookConfig,
291294
maxBlockRangeSize,
292295
coingeckoApiKey,
Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,43 @@
1-
import axios from "axios";
1+
import { PubSub, Topic } from "@google-cloud/pubsub";
22
import { Config } from "../parseEnv";
33

44
/**
55
* Helper class to publish messages to a GCP pubsub topic.
66
*/
77
export class PubSubService {
8-
constructor(private readonly config: Config) {}
8+
private readonly pubSub: PubSub;
9+
private cctpFinalizerTopic: Topic;
10+
11+
constructor(private readonly config: Config) {
12+
this.pubSub = new PubSub({
13+
projectId: this.config.pubSubGcpProjectId,
14+
});
15+
}
916

1017
async publishCctpFinalizerMessage(
1118
burnTransactionHash: string,
1219
sourceChainId: number,
20+
message: string,
21+
attestation: string,
22+
destinationChainId: number,
1323
) {
24+
if (!this.cctpFinalizerTopic) {
25+
const topic = await this.pubSub.topic(
26+
this.config.pubSubCctpFinalizerTopic,
27+
);
28+
this.cctpFinalizerTopic = topic;
29+
}
1430
// the published payload is a base64 encoded JSON string. The JSON is
1531
// validated by the Avro schema defined in GCP
1632
const payload = Buffer.from(
1733
JSON.stringify({
1834
burnTransactionHash: burnTransactionHash,
1935
sourceChainId,
36+
message,
37+
attestation,
38+
destinationChainId,
2039
}),
21-
).toString("base64");
22-
const body = { messages: [{ data: payload }] };
23-
const response = await axios.post(this.config.finalizerPubSubTopic, body, {
24-
headers: {
25-
"Content-Type": "application/json",
26-
// TODO: this authorization method is temporary for local testing.
27-
// It must be replaced with a proper authentication method.
28-
Authorization: `Bearer <AUTH_TOKEN>`,
29-
},
30-
});
31-
32-
return response.data;
40+
);
41+
await this.cctpFinalizerTopic.publishMessage({ data: payload });
3342
}
3443
}

0 commit comments

Comments
 (0)