Skip to content

Commit f0ca94a

Browse files
authored
feat(webhooks): allow client registration through env (#118)
Signed-off-by: david <[email protected]>
1 parent df6c94d commit f0ca94a

File tree

10 files changed

+117
-33
lines changed

10 files changed

+117
-33
lines changed

packages/indexer-api/src/main.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,12 @@ export async function Main(
8787
const postgres = await connectToDatabase(postgresConfig, logger);
8888
const redisConfig = Indexer.parseRedisConfig(env);
8989
const redis = await initializeRedis(redisConfig, logger);
90-
const webhooks = Webhooks.WebhookFactory(
90+
const webhooks = await Webhooks.WebhookFactory(
9191
{
9292
enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus],
9393
enabledWebhookRequestWorkers: false,
94+
// indexer will register clients
95+
clients: [],
9496
},
9597
{ postgres, logger, redis },
9698
);

packages/indexer-database/src/entities/WebhookClient.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm";
22

33
@Entity()
44
@Unique("UK_webhook_client_api_key", ["apiKey"])
5+
@Unique("UK_webhook_client_name", ["name"])
56
export class WebhookClient {
67
@PrimaryGeneratedColumn()
78
id: number;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { MigrationInterface, QueryRunner } from "typeorm";
2+
3+
export class Webhook1732730161339 implements MigrationInterface {
4+
name = "Webhook1732730161339";
5+
6+
public async up(queryRunner: QueryRunner): Promise<void> {
7+
await queryRunner.query(
8+
`ALTER TABLE "webhook_client" ADD CONSTRAINT "UQ_a08bea1c3eba7711301141ae001" UNIQUE ("name")`,
9+
);
10+
}
11+
12+
public async down(queryRunner: QueryRunner): Promise<void> {
13+
await queryRunner.query(
14+
`ALTER TABLE "webhook_client" DROP CONSTRAINT "UQ_a08bea1c3eba7711301141ae001"`,
15+
);
16+
}
17+
}

packages/indexer/src/main.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,11 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
5656
const redisCache = new RedisCache(redis);
5757
const postgres = await connectToDatabase(postgresConfig, logger);
5858
// Call write to kick off webhook calls
59-
const { write } = WebhookFactory(
60-
{
61-
enabledWebhooks: [WebhookTypes.DepositStatus],
62-
enabledWebhookRequestWorkers: true,
63-
},
64-
{ postgres, logger, redis },
65-
);
59+
const { write } = await WebhookFactory(config.webhookConfig, {
60+
postgres,
61+
logger,
62+
redis,
63+
});
6664
// Retry providers factory
6765
const retryProvidersFactory = new RetryProvidersFactory(
6866
redisCache,

packages/indexer/src/parseEnv.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ import * as s from "superstruct";
22
import { DatabaseConfig } from "@repo/indexer-database";
33
import { getNoTtlBlockDistance } from "./web3/constants";
44
import { assert } from "@repo/error-handling";
5+
import {
6+
Config as WebhooksConfig,
7+
WebhookTypes,
8+
parseWebhookClientsFromString,
9+
} from "@repo/webhooks";
510

611
export type Config = {
712
redisConfig: RedisConfig;
@@ -12,6 +17,7 @@ export type Config = {
1217
enableBundleEventsProcessor: boolean;
1318
enableBundleIncludedEventsService: boolean;
1419
enableBundleBuilder: boolean;
20+
webhookConfig: WebhooksConfig;
1521
};
1622
export type RedisConfig = {
1723
host: string;
@@ -182,6 +188,11 @@ export function envToConfig(env: Env): Config {
182188
`SPOKEPOOL_CHAINS_ENABLED=${chainId} but did not find any corresponding RPC_PROVIDER_URLS_${chainId}`,
183189
);
184190
});
191+
const webhookConfig = {
192+
enabledWebhooks: [WebhookTypes.DepositStatus],
193+
enabledWebhookRequestWorkers: true,
194+
clients: parseWebhookClientsFromString(env.WEBHOOK_CLIENTS ?? "[]"),
195+
};
185196
return {
186197
redisConfig,
187198
postgresConfig,
@@ -191,5 +202,6 @@ export function envToConfig(env: Env): Config {
191202
enableBundleEventsProcessor,
192203
enableBundleIncludedEventsService,
193204
enableBundleBuilder,
205+
webhookConfig,
194206
};
195207
}

packages/webhooks/src/database/webhookClientRepository.ts

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { entities, DataSource } from "@repo/indexer-database";
2+
import { exists } from "../utils";
23
import assert from "assert";
34

45
// This class is intended to store integration clients allowed to use the webhook service.
@@ -9,14 +10,30 @@ export class WebhookClientRepository {
910
this.repository = this.dataSource.getRepository(entities.WebhookClient);
1011
}
1112

12-
public async registerClient(client: entities.WebhookClient): Promise<void> {
13-
const existingClient = await this.repository.findOne({
14-
where: { id: client.id },
15-
});
16-
if (existingClient) {
17-
throw new Error(`Client with id ${client.id} already exists.`);
13+
public async registerClient(
14+
client: Omit<entities.WebhookClient, "id">,
15+
): Promise<entities.WebhookClient> {
16+
assert(
17+
!(await this.hasClientByName(client.name)),
18+
"Client with that name already exists",
19+
);
20+
const result = await this.repository.insert(client);
21+
return result.raw[0];
22+
}
23+
public async upsertClient(
24+
client: Omit<entities.WebhookClient, "id">,
25+
): Promise<entities.WebhookClient> {
26+
if (await this.hasClientByName(client.name)) {
27+
return this.updateClientByName(client);
28+
} else {
29+
return this.registerClient(client);
1830
}
19-
await this.repository.insert(client);
31+
}
32+
public async updateClientByName(
33+
client: Omit<entities.WebhookClient, "id">,
34+
): Promise<entities.WebhookClient> {
35+
const result = await this.repository.update({ name: client.name }, client);
36+
return result.raw[0];
2037
}
2138

2239
public async unregisterClient(clientId: number): Promise<void> {
@@ -40,12 +57,20 @@ export class WebhookClientRepository {
4057
public async listClients(): Promise<entities.WebhookClient[]> {
4158
return this.repository.find();
4259
}
43-
60+
public async hasClientByName(name: string): Promise<boolean> {
61+
const result = await this.repository.findOne({ where: { name } });
62+
return exists(result);
63+
}
64+
public async getClientByName(name: string): Promise<entities.WebhookClient> {
65+
const result = await this.repository.findOne({ where: { name } });
66+
assert(result, `Client by name: ${name} does not exist`);
67+
return result;
68+
}
4469
public async getClientByApiKey(
4570
apiKey: string,
4671
): Promise<entities.WebhookClient> {
4772
const result = await this.repository.findOne({ where: { apiKey } });
48-
assert(result, "Invalid api key");
73+
assert(result, `Client by apiKey: ${apiKey} does not exist`);
4974
return result;
5075
}
5176

packages/webhooks/src/eventProcessorManager.ts

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export type Dependencies = {
2929
postgres: DataSource;
3030
logger: Logger;
3131
webhooksQueuesService: WebhooksQueuesService;
32+
clientRepository: WebhookClientRepository;
3233
};
3334
export class EventProcessorManager {
3435
private logger: Logger;
@@ -38,8 +39,8 @@ export class EventProcessorManager {
3839

3940
constructor(deps: Dependencies) {
4041
this.logger = deps.logger;
41-
this.clientRepository = new WebhookClientRepository(deps.postgres); // Initialize the client manager
4242
this.webhooksQueuesService = deps.webhooksQueuesService;
43+
this.clientRepository = deps.clientRepository;
4344
}
4445

4546
// Register a new type of webhook processor able to be written to
@@ -76,12 +77,13 @@ export class EventProcessorManager {
7677
`Attempting to register webhook of type: ${params.type} with URL: ${params.url}`,
7778
);
7879
const client = await this.clientRepository.getClientByApiKey(apiKey);
79-
const urlDomain = new URL(params.url).hostname;
80-
const isDomainValid = client.domains.includes(urlDomain);
81-
assert(
82-
isDomainValid,
83-
"The base URL of the provided webhook does not match any of the client domains",
84-
);
80+
// TODO: Reinable this potentially when we need it, but not great for testing
81+
// const urlDomain = new URL(params.url).hostname;
82+
// const isDomainValid = client.domains.includes(urlDomain);
83+
// assert(
84+
// isDomainValid,
85+
// "The base URL of the provided webhook does not match any of the client domains",
86+
// );
8587
assert((params.filter as any).depositTxHash, "depositTxHash is required");
8688
assert((params.filter as any).originChainId, "originChainId is required");
8789
const webhook = this.getEventProcessor(params.type as WebhookTypes);
@@ -119,10 +121,4 @@ export class EventProcessorManager {
119121
`Successfully unregistered webhook with ID: ${params.id}`,
120122
);
121123
}
122-
123-
async registerClient(client: entities.WebhookClient) {
124-
this.logger.debug(`Attempting to register client with ID: ${client.id}`);
125-
await this.clientRepository.registerClient(client);
126-
this.logger.debug(`Successfully registered client with ID: ${client.id}`);
127-
}
128124
}

packages/webhooks/src/factory.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Logger } from "winston";
22
import { Redis } from "ioredis";
33

4-
import { DataSource } from "@repo/indexer-database";
4+
import { DataSource, entities } from "@repo/indexer-database";
55
import { assert } from "@repo/error-handling";
66

77
import { EventProcessorManager } from "./eventProcessorManager";
@@ -10,6 +10,8 @@ import { DepositStatusProcessor } from "./eventProcessors";
1010
import { WebhookRouter } from "./router";
1111
import { WebhooksQueuesService } from "./adapter/messaging/WebhooksQueuesService";
1212
import { WebhookRequestWorker } from "./adapter/messaging/WebhookRequestWorker";
13+
import { WebhookClientRepository } from "./database/webhookClientRepository";
14+
import { PartialWebhookClients } from "./types";
1315

1416
export enum WebhookTypes {
1517
DepositStatus = "DepositStatus",
@@ -18,25 +20,38 @@ export enum WebhookTypes {
1820
export type Config = {
1921
enabledWebhooks: WebhookTypes[];
2022
enabledWebhookRequestWorkers: boolean;
23+
clients: PartialWebhookClients;
2124
};
2225
type Dependencies = {
2326
postgres: DataSource;
2427
redis: Redis;
2528
logger: Logger;
2629
};
2730

28-
export function WebhookFactory(config: Config, deps: Dependencies) {
31+
export async function WebhookFactory(config: Config, deps: Dependencies) {
2932
const { logger, postgres, redis } = deps;
3033
const notifier = new WebhookNotifier({ logger });
3134
assert(
3235
config.enabledWebhooks.length,
3336
"No webhooks enabled, specify one in config",
3437
);
3538
const webhooksQueuesService = new WebhooksQueuesService(redis);
39+
const clientRepository = new WebhookClientRepository(postgres);
3640
const eventProcessorManager = new EventProcessorManager({
3741
postgres,
3842
logger,
3943
webhooksQueuesService,
44+
clientRepository,
45+
});
46+
const clientRegistrations = await Promise.all(
47+
config.clients.map((client) => {
48+
return clientRepository.upsertClient(client);
49+
}),
50+
);
51+
logger.info({
52+
message: "Registered webhook api clients",
53+
at: "Webhooks package factory",
54+
clientRegistrations,
4055
});
4156
config.enabledWebhooks.forEach((name) => {
4257
switch (name) {

packages/webhooks/src/types.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,13 @@ export const UnregisterParams = ss.object({
3737
id: ss.string(),
3838
});
3939
export type UnregisterParams = ss.Infer<typeof UnregisterParams>;
40+
41+
export const PartialWebhookClient = ss.type({
42+
name: ss.string(),
43+
apiKey: ss.string(),
44+
domains: ss.array(ss.string()),
45+
});
46+
export type PartialWebhookClient = ss.Infer<typeof PartialWebhookClient>;
47+
48+
export const PartialWebhookClients = ss.array(PartialWebhookClient);
49+
export type PartialWebhookClients = ss.Infer<typeof PartialWebhookClients>;

packages/webhooks/src/utils.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { NotificationPayload } from "./types";
1+
import { NotificationPayload, PartialWebhookClients } from "./types";
2+
import * as ss from "superstruct";
23
export async function post(params: NotificationPayload): Promise<void> {
34
const { url, data, apiKey } = params;
45
const response = await fetch(url, {
@@ -42,3 +43,10 @@ export function exists<T>(val: T | null | undefined): val is T {
4243
export function customId(...args: (string | number)[]): string {
4344
return args.join("!");
4445
}
46+
47+
export function parseWebhookClientsFromString(
48+
envStr: string,
49+
): PartialWebhookClients {
50+
const clients = JSON.parse(envStr);
51+
return ss.create(clients, PartialWebhookClients);
52+
}

0 commit comments

Comments
 (0)