Skip to content

Commit 60ff122

Browse files
authored
chore: Allow overriding Contract Subscription cron (#882)
1 parent b8422ea commit 60ff122

File tree

6 files changed

+58
-61
lines changed

6 files changed

+58
-61
lines changed

package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
"aws-kms-signer": "^0.5.3",
4949
"base-64": "^1.0.0",
5050
"bullmq": "^5.11.0",
51+
"cron": "^4.3.0",
5152
"cron-parser": "^4.9.0",
5253
"crypto": "^1.0.1",
5354
"crypto-js": "^4.2.0",
@@ -63,7 +64,6 @@
6364
"jsonwebtoken": "^9.0.2",
6465
"knex": "^3.1.0",
6566
"mnemonist": "^0.39.8",
66-
"node-cron": "^3.0.2",
6767
"ox": "0.6.9",
6868
"pg": "^8.11.3",
6969
"prisma": "^5.14.0",
@@ -82,7 +82,6 @@
8282
"@types/crypto-js": "^4.2.2",
8383
"@types/jsonwebtoken": "^9.0.6",
8484
"@types/node": "^18.15.4",
85-
"@types/node-cron": "^3.0.8",
8685
"@types/pg": "^8.6.6",
8786
"@types/uuid": "^9.0.1",
8887
"@types/ws": "^8.5.5",
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import cron from "node-cron";
1+
import { CronJob } from "cron";
22
import { clearCache } from "../cache/clear-cache";
33
import { getConfig } from "../cache/get-config";
44
import type { env } from "../env";
55

6-
let task: cron.ScheduledTask;
6+
let task: CronJob;
7+
78
export const clearCacheCron = async (
89
service: (typeof env)["LOG_SERVICES"][0],
910
) => {
@@ -13,11 +14,13 @@ export const clearCacheCron = async (
1314
return;
1415
}
1516

17+
// Stop the existing task if it exists.
1618
if (task) {
1719
task.stop();
1820
}
1921

20-
task = cron.schedule(config.clearCacheCronSchedule, async () => {
22+
task = new CronJob(config.clearCacheCronSchedule, async () => {
2123
await clearCache(service);
2224
});
25+
task.start();
2326
};

src/shared/utils/env.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ export const env = createEnv({
8686
QUEUE_FAIL_HISTORY_COUNT: z.coerce.number().default(10_000),
8787
// Sets the number of recent nonces to map to queue IDs.
8888
NONCE_MAP_COUNT: z.coerce.number().default(10_000),
89-
// Sets the estimated number of blocks to query per contract subscription job. Defaults to 1 block (real-time).
90-
CONTRACT_SUBSCRIPTION_BLOCK_RANGE: z.coerce.number().default(1),
89+
// Overrides the cron schedule for contract subscription jobs.
90+
CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE: z.string().optional(),
9191

9292
ENABLE_KEYPAIR_AUTH: boolEnvSchema(false),
9393
ENABLE_CUSTOM_HMAC_AUTH: boolEnvSchema(false),
@@ -138,8 +138,8 @@ export const env = createEnv({
138138
QUEUE_COMPLETE_HISTORY_COUNT: process.env.QUEUE_COMPLETE_HISTORY_COUNT,
139139
QUEUE_FAIL_HISTORY_COUNT: process.env.QUEUE_FAIL_HISTORY_COUNT,
140140
NONCE_MAP_COUNT: process.env.NONCE_MAP_COUNT,
141-
CONTRACT_SUBSCRIPTION_BLOCK_RANGE:
142-
process.env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE,
141+
CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE:
142+
process.env.CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE,
143143
EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS:
144144
process.env.EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS,
145145
EXPERIMENTAL__MAX_GAS_PRICE_WEI:
Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,34 @@
1-
import cron from "node-cron";
21
import { getBlockTimeSeconds } from "../../shared/utils/indexer/get-block-time";
32
import { logger } from "../../shared/utils/logger";
43
import { handleContractSubscriptions } from "../tasks/chain-indexer";
54
import { env } from "../../shared/utils/env";
5+
import { CronJob } from "cron";
66

77
// @TODO: Move all worker logic to Bullmq to better handle multiple hosts.
8-
export const INDEXER_REGISTRY = {} as Record<number, cron.ScheduledTask>;
8+
export const INDEXER_REGISTRY: Record<number, CronJob> = {};
99

1010
export const addChainIndexer = async (chainId: number) => {
1111
if (INDEXER_REGISTRY[chainId]) {
1212
return;
1313
}
1414

15-
// Estimate the block time in the last 100 blocks. Default to 2 second block times.
16-
let blockTimeSeconds: number;
17-
try {
18-
blockTimeSeconds = await getBlockTimeSeconds(chainId, 100);
19-
} catch (error) {
20-
logger({
21-
service: "worker",
22-
level: "error",
23-
message: `Could not estimate block time for chain ${chainId}`,
24-
error,
25-
});
26-
blockTimeSeconds = 2;
15+
let cronSchedule = env.CONTRACT_SUBSCRIPTION_CRON_SCHEDULE_OVERRIDE;
16+
if (!cronSchedule) {
17+
// Estimate the block time in the last 100 blocks. Default to 2 second block times.
18+
let blockTimeSeconds: number;
19+
try {
20+
blockTimeSeconds = await getBlockTimeSeconds(chainId, 100);
21+
} catch (error) {
22+
logger({
23+
service: "worker",
24+
level: "error",
25+
message: `Could not estimate block time for chain ${chainId}`,
26+
error,
27+
});
28+
blockTimeSeconds = 2;
29+
}
30+
cronSchedule = createScheduleSeconds(blockTimeSeconds);
2731
}
28-
const cronSchedule = createScheduleSeconds({
29-
blockTimeSeconds,
30-
numBlocks: env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE,
31-
});
3232
logger({
3333
service: "worker",
3434
level: "info",
@@ -37,7 +37,7 @@ export const addChainIndexer = async (chainId: number) => {
3737

3838
let inProgress = false;
3939

40-
const task = cron.schedule(cronSchedule, async () => {
40+
const task = new CronJob(cronSchedule, async () => {
4141
if (inProgress) {
4242
return;
4343
}
@@ -58,6 +58,7 @@ export const addChainIndexer = async (chainId: number) => {
5858
});
5959

6060
INDEXER_REGISTRY[chainId] = task;
61+
task.start();
6162
};
6263

6364
export const removeChainIndexer = async (chainId: number) => {
@@ -76,17 +77,7 @@ export const removeChainIndexer = async (chainId: number) => {
7677
delete INDEXER_REGISTRY[chainId];
7778
};
7879

79-
/**
80-
* Returns the cron schedule given the chain's block time and the number of blocks to batch per job.
81-
* Minimum is every 2 seconds.
82-
*/
83-
function createScheduleSeconds({
84-
blockTimeSeconds,
85-
numBlocks,
86-
}: { blockTimeSeconds: number; numBlocks: number }) {
87-
const pollFrequencySeconds = Math.max(
88-
Math.round(blockTimeSeconds * numBlocks),
89-
2,
90-
);
91-
return `*/${pollFrequencySeconds} * * * * *`;
80+
function createScheduleSeconds(blockTimeSeconds: number) {
81+
const pollIntervalSeconds = Math.max(Math.round(blockTimeSeconds), 2);
82+
return `*/${pollIntervalSeconds} * * * * *`;
9283
}

src/worker/listeners/chain-indexer-listener.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
1-
import cron from "node-cron";
1+
import { CronJob } from "cron";
22
import { getConfig } from "../../shared/utils/cache/get-config";
33
import { logger } from "../../shared/utils/logger";
44
import { manageChainIndexers } from "../tasks/manage-chain-indexers";
55

66
let processChainIndexerStarted = false;
7-
let task: cron.ScheduledTask;
7+
let task: CronJob;
88

99
export const chainIndexerListener = async (): Promise<void> => {
1010
const config = await getConfig();
1111
if (!config.indexerListenerCronSchedule) {
1212
return;
1313
}
14+
15+
// Stop the existing task if it exists.
1416
if (task) {
1517
task.stop();
1618
}
1719

18-
task = cron.schedule(config.indexerListenerCronSchedule, async () => {
20+
task = new CronJob(config.indexerListenerCronSchedule, async () => {
1921
if (!processChainIndexerStarted) {
2022
processChainIndexerStarted = true;
2123
await manageChainIndexers();
@@ -28,4 +30,5 @@ export const chainIndexerListener = async (): Promise<void> => {
2830
});
2931
}
3032
});
33+
task.start();
3134
};

yarn.lock

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4131,6 +4131,11 @@
41314131
resolved "https://registry.yarnpkg.com/@types/long/-/long-4.0.2.tgz#b74129719fc8d11c01868010082d483b7545591a"
41324132
integrity sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA==
41334133

4134+
"@types/luxon@~3.6.0":
4135+
version "3.6.2"
4136+
resolved "https://registry.yarnpkg.com/@types/luxon/-/luxon-3.6.2.tgz#be6536931801f437eafcb9c0f6d6781f72308041"
4137+
integrity sha512-R/BdP7OxEMc44l2Ex5lSXHoIXTB2JLNa3y2QISIbr58U/YcsffyQrYW//hZSdrfxrjRZj3GcUoxMPGdO8gSYuw==
4138+
41344139
"@types/markdown-it@^14.1.1":
41354140
version "14.1.2"
41364141
resolved "https://registry.yarnpkg.com/@types/markdown-it/-/markdown-it-14.1.2.tgz#57f2532a0800067d9b934f3521429a2e8bfb4c61"
@@ -4154,11 +4159,6 @@
41544159
resolved "https://registry.yarnpkg.com/@types/ms/-/ms-0.7.34.tgz#10964ba0dee6ac4cd462e2795b6bebd407303433"
41554160
integrity sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==
41564161

4157-
"@types/node-cron@^3.0.8":
4158-
version "3.0.11"
4159-
resolved "https://registry.yarnpkg.com/@types/node-cron/-/node-cron-3.0.11.tgz#70b7131f65038ae63cfe841354c8aba363632344"
4160-
integrity sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==
4161-
41624162
"@types/node@*", "@types/node@>=13.7.0":
41634163
version "22.10.2"
41644164
resolved "https://registry.yarnpkg.com/@types/node/-/node-22.10.2.tgz#a485426e6d1fdafc7b0d4c7b24e2c78182ddabb9"
@@ -5923,6 +5923,14 @@ cron-parser@^4.6.0, cron-parser@^4.9.0:
59235923
dependencies:
59245924
luxon "^3.2.1"
59255925

5926+
cron@^4.3.0:
5927+
version "4.3.0"
5928+
resolved "https://registry.yarnpkg.com/cron/-/cron-4.3.0.tgz#c5a62872f74f72294cf1cadef34c72ad8d8f50b5"
5929+
integrity sha512-ciiYNLfSlF9MrDqnbMdRWFiA6oizSF7kA1osPP9lRzNu0Uu+AWog1UKy7SkckiDY2irrNjeO6qLyKnXC8oxmrw==
5930+
dependencies:
5931+
"@types/luxon" "~3.6.0"
5932+
luxon "~3.6.0"
5933+
59265934
cross-fetch@^3.1.4:
59275935
version "3.1.8"
59285936
resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.8.tgz#0327eba65fd68a7d119f8fb2bf9334a1a7956f82"
@@ -8367,6 +8375,11 @@ luxon@^3.2.1:
83678375
resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.5.0.tgz#6b6f65c5cd1d61d1fd19dbf07ee87a50bf4b8e20"
83688376
integrity sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==
83698377

8378+
luxon@~3.6.0:
8379+
version "3.6.1"
8380+
resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.6.1.tgz#d283ffc4c0076cb0db7885ec6da1c49ba97e47b0"
8381+
integrity sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ==
8382+
83708383
magic-sdk@^13.6.2:
83718384
version "13.6.2"
83728385
resolved "https://registry.yarnpkg.com/magic-sdk/-/magic-sdk-13.6.2.tgz#68766fd9d1805332d2a00e5da1bd30fce251a6ac"
@@ -8728,13 +8741,6 @@ node-addon-api@^7.0.0:
87288741
resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-7.1.1.tgz#1aba6693b0f255258a049d621329329322aad558"
87298742
integrity sha512-5m3bsyrjFWE1xf7nz7YXdN4udnVtXK6/Yfgn5qnahL6bCkf2yKt4k3nuTKAtT4r3IG8JNR2ncsIMdZuAzJjHQQ==
87308743

8731-
node-cron@^3.0.2:
8732-
version "3.0.3"
8733-
resolved "https://registry.yarnpkg.com/node-cron/-/node-cron-3.0.3.tgz#c4bc7173dd96d96c50bdb51122c64415458caff2"
8734-
integrity sha512-dOal67//nohNgYWb+nWmg5dkFdIwDm8EpeGYMekPMrngV3637lqnX0lbUcCtgibHTz6SEz7DAIjKvKDFYCnO1A==
8735-
dependencies:
8736-
uuid "8.3.2"
8737-
87388744
node-fetch-native@^1.6.4:
87398745
version "1.6.4"
87408746
resolved "https://registry.yarnpkg.com/node-fetch-native/-/node-fetch-native-1.6.4.tgz#679fc8fd8111266d47d7e72c379f1bed9acff06e"
@@ -10890,11 +10896,6 @@ [email protected]:
1089010896
resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.0.0.tgz#bc6ccf91b5ff0ac07bbcdbf1c7c4e150db4dbb6c"
1089110897
integrity sha512-jOXGuXZAWdsTH7eZLtyXMqUb9EcWMGZNbL9YcGBJl4MH4nrxHmZJhEHvyLFrkxo+28uLb/NYRcStH48fnD0Vzw==
1089210898

10893-
10894-
version "8.3.2"
10895-
resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2"
10896-
integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==
10897-
1089810899
1089910900
version "9.0.0"
1090010901
resolved "https://registry.yarnpkg.com/uuid/-/uuid-9.0.0.tgz#592f550650024a38ceb0c562f2f6aa435761efb5"

0 commit comments

Comments
 (0)