Skip to content

Commit

Permalink
fix: health checks for worker and fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
vasyl-ivanchuk committed Jan 17, 2025
1 parent 20d2444 commit 6824c13
Show file tree
Hide file tree
Showing 16 changed files with 323 additions and 79 deletions.
26 changes: 15 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion packages/data-fetcher/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ RPC_BATCH_MAX_COUNT=10
RPC_BATCH_MAX_SIZE_BYTES=1048576
RPC_BATCH_STALL_TIME_MS=0

MAX_BLOCKS_BATCH_SIZE=20
MAX_BLOCKS_BATCH_SIZE=20

RPC_HEALTH_CHECK_TIMEOUT_MS=20_000
2 changes: 2 additions & 0 deletions packages/data-fetcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
"test:e2e": "jest --config ./test/jest-e2e.json"
},
"dependencies": {
"@nestjs/axios": "^3.1.3",
"@nestjs/common": "^9.0.0",
"@nestjs/config": "^2.2.0",
"@nestjs/core": "^9.0.0",
"@nestjs/platform-express": "^9.0.0",
"@nestjs/terminus": "^9.1.2",
"@willsoto/nestjs-prometheus": "^4.7.0",
"axios": "^1.7.9",
"ethers": "6.13.4",
"nest-winston": "^1.7.0",
"prom-client": "^14.1.0",
Expand Down
3 changes: 3 additions & 0 deletions packages/data-fetcher/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ describe("config", () => {
},
maxBlocksBatchSize: 20,
gracefulShutdownTimeoutMs: 0,
healthChecks: {
rpcHealthCheckTimeoutMs: 20_000,
},
};
});

Expand Down
4 changes: 4 additions & 0 deletions packages/data-fetcher/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export default () => {
RPC_BATCH_STALL_TIME_MS,
MAX_BLOCKS_BATCH_SIZE,
GRACEFUL_SHUTDOWN_TIMEOUT_MS,
RPC_HEALTH_CHECK_TIMEOUT_MS,
} = process.env;

return {
Expand Down Expand Up @@ -42,5 +43,8 @@ export default () => {
},
maxBlocksBatchSize: parseInt(MAX_BLOCKS_BATCH_SIZE, 10) || 20,
gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0,
healthChecks: {
rpcHealthCheckTimeoutMs: parseInt(RPC_HEALTH_CHECK_TIMEOUT_MS, 10) || 20_000,
},
};
};
3 changes: 2 additions & 1 deletion packages/data-fetcher/src/health/health.module.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Module } from "@nestjs/common";
import { TerminusModule } from "@nestjs/terminus";
import { HttpModule } from "@nestjs/axios";
import { HealthController } from "./health.controller";
import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health";

@Module({
controllers: [HealthController],
imports: [TerminusModule],
imports: [TerminusModule, HttpModule],
providers: [JsonRpcHealthIndicator],
})
export class HealthModule {}
105 changes: 80 additions & 25 deletions packages/data-fetcher/src/health/jsonRpcProvider.health.spec.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,111 @@
import { Test, TestingModule } from "@nestjs/testing";
import { Logger } from "@nestjs/common";
import { mock } from "jest-mock-extended";
import { HealthCheckError } from "@nestjs/terminus";
import { JsonRpcProviderBase } from "../rpcProvider";
import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health";
import { ConfigService } from "@nestjs/config";
import { HttpService } from "@nestjs/axios";
import { of, throwError } from "rxjs";
import { AxiosError } from "axios";

describe("JsonRpcHealthIndicator", () => {
const healthIndicatorKey = "rpcProvider";
let jsonRpcProviderMock: JsonRpcProviderBase;
let jsonRpcHealthIndicator: JsonRpcHealthIndicator;
let httpService: HttpService;
let configService: ConfigService;

beforeEach(async () => {
jsonRpcProviderMock = mock<JsonRpcProviderBase>();

const getHealthIndicator = async () => {
const app: TestingModule = await Test.createTestingModule({
providers: [
JsonRpcHealthIndicator,
{
provide: JsonRpcProviderBase,
useValue: jsonRpcProviderMock,
},
{
provide: HttpService,
useValue: httpService,
},
{
provide: ConfigService,
useValue: configService,
},
],
}).compile();

jsonRpcHealthIndicator = app.get<JsonRpcHealthIndicator>(JsonRpcHealthIndicator);
app.useLogger(mock<Logger>());
return app.get<JsonRpcHealthIndicator>(JsonRpcHealthIndicator);
};

beforeEach(async () => {
jsonRpcProviderMock = mock<JsonRpcProviderBase>();

httpService = mock<HttpService>({
post: jest.fn(),
});

configService = mock<ConfigService>({
get: jest.fn().mockImplementation((key: string) => {
if (key === "blockchain.rpcUrl") return "http://localhost:3050";
if (key === "healthChecks.rpcHealthCheckTimeoutMs") return 5000;
return null;
}),
});

jsonRpcHealthIndicator = await getHealthIndicator();
});

describe("isHealthy", () => {
describe("when rpcProvider is open", () => {
beforeEach(() => {
jest.spyOn(jsonRpcProviderMock, "getState").mockReturnValueOnce("open");
});
const rpcRequest = {
id: 1,
jsonrpc: "2.0",
method: "eth_chainId",
params: [],
};

it("returns OK health indicator result", async () => {
const result = await jsonRpcHealthIndicator.isHealthy(healthIndicatorKey);
expect(result).toEqual({ [healthIndicatorKey]: { rpcProviderState: "open", status: "up" } });
it("returns healthy status when RPC responds successfully", async () => {
(httpService.post as jest.Mock).mockReturnValueOnce(of({ data: { result: "0x1" } }));
const result = await jsonRpcHealthIndicator.isHealthy("jsonRpcProvider");
expect(result).toEqual({
jsonRpcProvider: {
status: "up",
},
});
expect(httpService.post).toHaveBeenCalledWith("http://localhost:3050", rpcRequest, { timeout: 5000 });
});

describe("when rpcProvider is closed", () => {
beforeEach(() => {
jest.spyOn(jsonRpcProviderMock, "getState").mockReturnValueOnce("closed");
});
it("throws HealthCheckError when RPC request fails", async () => {
const error = new AxiosError();
error.response = {
status: 503,
data: "Service Unavailable",
} as any;

it("throws HealthCheckError error", async () => {
expect.assertions(2);
try {
await jsonRpcHealthIndicator.isHealthy(healthIndicatorKey);
} catch (error) {
expect(error).toBeInstanceOf(HealthCheckError);
expect(error.message).toBe("JSON RPC provider is not in open state");
}
(httpService.post as jest.Mock).mockReturnValueOnce(throwError(() => error));
await expect(jsonRpcHealthIndicator.isHealthy("jsonRpcProvider")).rejects.toThrow();
expect(httpService.post).toHaveBeenCalledWith("http://localhost:3050", rpcRequest, { timeout: 5000 });
});

it("throws HealthCheckError when RPC request times out", async () => {
const error = new AxiosError();
error.code = "ECONNABORTED";

(httpService.post as jest.Mock).mockReturnValueOnce(throwError(() => error));
await expect(jsonRpcHealthIndicator.isHealthy("jsonRpcProvider")).rejects.toThrow();
expect(httpService.post).toHaveBeenCalledWith("http://localhost:3050", rpcRequest, { timeout: 5000 });
});

it("should use configured timeout from config service", async () => {
(configService.get as jest.Mock).mockImplementation((key: string) => {
if (key === "blockchain.rpcUrl") return "http://localhost:3050";
if (key === "healthChecks.rpcHealthCheckTimeoutMs") return 10000;
return null;
});
jsonRpcHealthIndicator = await getHealthIndicator();

(httpService.post as jest.Mock).mockReturnValueOnce(of({ data: { result: "0x1" } }));
await jsonRpcHealthIndicator.isHealthy("jsonRpcProvider");
expect(httpService.post).toHaveBeenCalledWith("http://localhost:3050", rpcRequest, { timeout: 10000 });
});
});
});
54 changes: 48 additions & 6 deletions packages/data-fetcher/src/health/jsonRpcProvider.health.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,64 @@
import { Injectable } from "@nestjs/common";
import { HealthIndicator, HealthIndicatorResult, HealthCheckError } from "@nestjs/terminus";
import { JsonRpcProviderBase } from "../rpcProvider";
import { ConfigService } from "@nestjs/config";
import { Logger } from "@nestjs/common";
import { HttpService } from "@nestjs/axios";
import { catchError, firstValueFrom, of } from "rxjs";
import { AxiosError } from "axios";

@Injectable()
export class JsonRpcHealthIndicator extends HealthIndicator {
constructor(private readonly provider: JsonRpcProviderBase) {
private readonly rpcUrl: string;
private readonly healthCheckTimeoutMs: number;
private readonly logger: Logger;

constructor(configService: ConfigService, private readonly httpService: HttpService) {
super();
this.logger = new Logger(JsonRpcHealthIndicator.name);
this.rpcUrl = configService.get<string>("blockchain.rpcUrl");
this.healthCheckTimeoutMs = configService.get<number>("healthChecks.rpcHealthCheckTimeoutMs");
}

async isHealthy(key: string): Promise<HealthIndicatorResult> {
const rpcProviderState = this.provider.getState();
const isHealthy = rpcProviderState === "open";
const result = this.getStatus(key, isHealthy, { rpcProviderState });
let isHealthy = true;
try {
// Check RPC health with a pure HTTP request to remove SDK out of the picture
// and avoid any SDK-specific issues.
// Use eth_chainId call as it is the lightest one and return a static value from the memory.
await firstValueFrom(
this.httpService
.post(
this.rpcUrl,
{
id: 1,
jsonrpc: "2.0",
method: "eth_chainId",
params: [],
},
{ timeout: this.healthCheckTimeoutMs }
)
.pipe(
catchError((error: AxiosError) => {
this.logger.error({
message: `Failed to ping RPC`,
stack: error.stack,
status: error.response?.status,
response: error.response?.data,
});
throw error;
})
)
);
} catch {
isHealthy = false;
}

const result = this.getStatus(key, isHealthy, { status: isHealthy ? "up" : "down" });

if (isHealthy) {
return result;
}

throw new HealthCheckError("JSON RPC provider is not in open state", result);
throw new HealthCheckError("JSON RPC provider is down or not reachable", result);
}
}
3 changes: 3 additions & 0 deletions packages/worker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ RPC_BATCH_STALL_TIME_MS=0
COLLECT_DB_CONNECTION_POOL_METRICS_INTERVAL=10000
COLLECT_BLOCKS_TO_PROCESS_METRIC_INTERVAL=10000

RPC_HEALTH_CHECK_TIMEOUT_MS=20000
DB_HEALTH_CHECK_TIMEOUT_MS=20000

DISABLE_MISSING_BLOCKS_METRIC=false
CHECK_MISSING_BLOCKS_METRIC_INTERVAL=86400000

Expand Down
8 changes: 8 additions & 0 deletions packages/worker/src/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ describe("config", () => {
interval: 86_400_000,
},
},
healthChecks: {
rpcHealthCheckTimeoutMs: 20_000,
dbHealthCheckTimeoutMs: 20_000,
},
};
});

Expand Down Expand Up @@ -123,6 +127,10 @@ describe("config", () => {
interval: 86_400_000,
},
},
healthChecks: {
rpcHealthCheckTimeoutMs: 20_000,
dbHealthCheckTimeoutMs: 20_000,
},
});
});

Expand Down
6 changes: 6 additions & 0 deletions packages/worker/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export default () => {
COINGECKO_API_KEY,
DISABLE_MISSING_BLOCKS_METRIC,
CHECK_MISSING_BLOCKS_METRIC_INTERVAL,
RPC_HEALTH_CHECK_TIMEOUT_MS,
DB_HEALTH_CHECK_TIMEOUT_MS,
} = process.env;

return {
Expand Down Expand Up @@ -97,5 +99,9 @@ export default () => {
interval: parseInt(CHECK_MISSING_BLOCKS_METRIC_INTERVAL, 10) || 86_400_000, // 1 day
},
},
healthChecks: {
rpcHealthCheckTimeoutMs: parseInt(RPC_HEALTH_CHECK_TIMEOUT_MS, 10) || 20_000,
dbHealthCheckTimeoutMs: parseInt(DB_HEALTH_CHECK_TIMEOUT_MS, 10) || 20_000,
},
};
};
Loading

0 comments on commit 6824c13

Please sign in to comment.