diff --git a/.changeset/bright-planes-lay.md b/.changeset/bright-planes-lay.md new file mode 100644 index 0000000000..b31c7d2fc0 --- /dev/null +++ b/.changeset/bright-planes-lay.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +test: Add e2e test for hubble startup diff --git a/apps/hubble/scripts/clidocs.cjs b/apps/hubble/scripts/clidocs.cjs index a861651b28..8ace31ee79 100644 --- a/apps/hubble/scripts/clidocs.cjs +++ b/apps/hubble/scripts/clidocs.cjs @@ -13,7 +13,7 @@ module.exports = function clidocs() { const regex = /--\w+(-\w+)*/g; let match; - // rome-ignore lint/suspicious/noAssignInExpressions: + // biome-ignore lint/suspicious/noAssignInExpressions: while ((match = regex.exec(helpOutput)) !== null) { optionNames.push(match[0]); } diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 3bfa352dd8..e0589cb64e 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -204,10 +204,10 @@ export interface HubOptions { rebuildSyncTrie?: boolean; /** Commit lock timeout in ms */ - commitLockTimeout: number; + commitLockTimeout?: number; /** Commit lock queue size */ - commitLockMaxPending: number; + commitLockMaxPending?: number; /** Enables the Admin Server */ adminServerEnabled?: boolean; @@ -218,11 +218,7 @@ export interface HubOptions { /** Periodically add casts & reactions for the following test users */ testUsers?: TestUser[]; - /** - * Only allows the Hub to connect to and advertise local IP addresses - * - * Only used by tests - */ + /** Only allows the Hub to connect to and advertise local IP addresses (Only used by tests) */ localIpAddrsOnly?: boolean; /** Cron schedule for prune messages job */ @@ -248,7 +244,7 @@ export interface HubOptions { } /** @returns A randomized string of the format `rocksdb.tmp.*` used for the DB Name */ -const randomDbName = () => { +export const randomDbName = () => { return `rocksdb.tmp.${(new Date().getUTCDate() * Math.random()).toString(36).substring(2)}`; }; @@ -285,10 +281,6 @@ export class Hub implements HubInterface { constructor(options: HubOptions) { this.options = options; - this.rocksDB = new RocksDB(options.rocksDBName ? options.rocksDBName : randomDbName()); - this.gossipNode = new GossipNode(this.options.network); - - this.s3_snapshot_bucket = options.s3SnapshotBucket ?? SNAPSHOT_S3_DEFAULT_BUCKET; if (!options.ethMainnetRpcUrl) { log.warn("No ETH mainnet RPC URL provided, unable to validate ens names"); @@ -327,6 +319,11 @@ export class Hub implements HubInterface { throw new HubError("bad_request.invalid_param", "Invalid fname server url"); } + this.rocksDB = new RocksDB(options.rocksDBName ? options.rocksDBName : randomDbName()); + this.gossipNode = new GossipNode(this.options.network); + + this.s3_snapshot_bucket = options.s3SnapshotBucket ?? SNAPSHOT_S3_DEFAULT_BUCKET; + const eventHandler = new StoreEventHandler(this.rocksDB, { lockMaxPending: options.commitLockMaxPending, lockTimeout: options.commitLockTimeout, diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index f8c5e89de4..9b590e83a3 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -136,7 +136,7 @@ class Engine { try { if (fs.existsSync(workerPath)) { this._validationWorker = new Worker(workerPath); - logger.info({ workerPath }, "created validation worker thread"); + log.info({ workerPath }, "created validation worker thread"); this._validationWorker.on("message", (data) => { const { id, message, errCode, errMessage } = data; @@ -150,14 +150,14 @@ class Engine { resolve(err(new HubError(errCode, errMessage))); } } else { - logger.warn({ id }, "validation worker promise.response not found"); + log.warn({ id }, "validation worker promise.response not found"); } }); } else { - logger.warn({ workerPath }, "validation.worker.js not found, falling back to main thread"); + log.warn({ workerPath }, "validation.worker.js not found, falling back to main thread"); } } catch (e) { - logger.warn({ workerPath, e }, "failed to create validation worker, falling back to main thread"); + log.warn({ workerPath, e }, "failed to create validation worker, falling back to main thread"); } } @@ -176,8 +176,9 @@ class Engine { this._revokeSignerWorker.start(); if (this._validationWorker) { - this._validationWorker.terminate(); + await this._validationWorker.terminate(); this._validationWorker = undefined; + log.info("validation worker thread terminated"); } log.info("engine stopped"); } @@ -209,7 +210,7 @@ class Engine { limiter = getRateLimiterForTotalMessages(storageUnits.value * this._totalPruneSize); const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); if (isRateLimited) { - logger.warn({ fid }, "rate limit exceeded for FID"); + log.warn({ fid }, "rate limit exceeded for FID"); return err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`)); } } diff --git a/apps/hubble/src/storage/stores/storeEventHandler.ts b/apps/hubble/src/storage/stores/storeEventHandler.ts index 1573a1e7f2..46e858b1a5 100644 --- a/apps/hubble/src/storage/stores/storeEventHandler.ts +++ b/apps/hubble/src/storage/stores/storeEventHandler.ts @@ -156,8 +156,8 @@ const putEventTransaction = (txn: Transaction, event: HubEvent): Transaction => }; export type StoreEventHandlerOptions = { - lockMaxPending?: number; - lockTimeout?: number; + lockMaxPending?: number | undefined; + lockTimeout?: number | undefined; }; class StoreEventHandler extends TypedEmitter { diff --git a/apps/hubble/src/test/e2e/hubbleStartup.test.ts b/apps/hubble/src/test/e2e/hubbleStartup.test.ts new file mode 100644 index 0000000000..6b17b22edb --- /dev/null +++ b/apps/hubble/src/test/e2e/hubbleStartup.test.ts @@ -0,0 +1,143 @@ +import { Factories, FarcasterNetwork, bytesToHexString } from "@farcaster/hub-nodejs"; +import { deployStorageRegistry, testClient } from "../utils.js"; +import { Hub, HubOptions, randomDbName } from "../../hubble.js"; +import { localHttpUrl } from "../constants.js"; +import { sleep } from "../../utils/crypto.js"; +import { FastifyInstance } from "fastify"; +import { DB_DIRECTORY } from "../../storage/db/rocksdb.js"; +import fastify from "fastify"; +import fs from "fs"; +import { Result } from "neverthrow"; + +const TEST_TIMEOUT_SHORT = 10_000; + +export class TestFNameRegistryServer { + private server?: FastifyInstance; + private port = 0; + + public async start(): Promise { + this.server = fastify(); + + this.server.get("/transfers", async (request, reply) => { + reply.send({ transfers: [] }); + }); + + this.server.get("/signer", async (request, reply) => { + reply.send({ signer: bytesToHexString(Factories.EthAddress.build())._unsafeUnwrap() }); + }); + + try { + await this.server.listen({ port: this.port, host: "localhost" }); + const address = this.server.server.address(); + const port = typeof address === "string" ? 0 : address?.port; + return `http://localhost:${port}`; + } catch (err) { + console.log(err); + throw err; + } + } + + public async stop(): Promise { + try { + await this.server?.close(); + } catch (err) { + console.log(err); + } + } +} + +let storageRegistryAddress: `0x${string}`; +let keyRegistryAddress: `0x${string}`; +let idRegistryAddress: `0x${string}`; + +let fnameServerUrl: string; +let fnameServer: TestFNameRegistryServer; +let rocksDBName: string; + +let hubOptions: HubOptions; + +describe("hubble startuup tests", () => { + beforeEach(async () => { + const { contractAddress: storageAddr } = await deployStorageRegistry(); + if (!storageAddr) throw new Error("Failed to deploy StorageRegistry contract"); + storageRegistryAddress = storageAddr; + + idRegistryAddress = bytesToHexString(Factories.EthAddress.build())._unsafeUnwrap(); + keyRegistryAddress = bytesToHexString(Factories.EthAddress.build())._unsafeUnwrap(); + + fnameServer = new TestFNameRegistryServer(); + fnameServerUrl = await fnameServer.start(); + + rocksDBName = randomDbName(); + + hubOptions = { + network: FarcasterNetwork.DEVNET, + l2StorageRegistryAddress: storageRegistryAddress, + l2IdRegistryAddress: idRegistryAddress, + l2KeyRegistryAddress: keyRegistryAddress, + l2RpcUrl: localHttpUrl, + ethMainnetRpcUrl: localHttpUrl, + fnameServerUrl, + rocksDBName, + announceIp: "127.0.0.1", + disableSnapshotSync: true, + }; + }); + + afterAll(async () => { + await fnameServer.stop(); + + // rm -rf the rocksdb directory + fs.rm(`${DB_DIRECTORY}/${rocksDBName}`, { recursive: true }, (err) => {}); + }); + + test( + "Starts up with no errors", + async () => { + const client = testClient; + let hub; + + try { + hub = new Hub(hubOptions); + + await hub.start(); // If exception is thrown, test errors out + + // Sleep for 1 sec + await sleep(1000); + + const hubState = await hub.getHubState(); + expect(hubState.isOk()).toBe(true); + } finally { + await hub?.teardown(); + } + }, + TEST_TIMEOUT_SHORT, + ); + + test("Needs a valid fname server url", async () => { + const hub = Result.fromThrowable( + () => new Hub({ ...hubOptions, fnameServerUrl: "" }), + (e) => e as Error, + )(); + expect(hub.isErr()).toBe(true); + expect(hub._unsafeUnwrapErr().message).toContain("Invalid fname server url"); + }); + + test("Needs a valid l2 rpc url", async () => { + const hub = Result.fromThrowable( + () => new Hub({ ...hubOptions, l2RpcUrl: "" }), + (e) => e as Error, + )(); + expect(hub.isErr()).toBe(true); + expect(hub._unsafeUnwrapErr().message).toContain("Invalid l2 rpc url"); + }); + + test("Needs a valid eth mainnet rpc url", async () => { + const hub = Result.fromThrowable( + () => new Hub({ ...hubOptions, ethMainnetRpcUrl: "" }), + (e) => e as Error, + )(); + expect(hub.isErr()).toBe(true); + expect(hub._unsafeUnwrapErr().message).toContain("Invalid eth mainnet rpc url"); + }); +});