Skip to content

Commit

Permalink
test: Add e2e test for hubble startup (farcasterxyz#1389)
Browse files Browse the repository at this point in the history
  • Loading branch information
adityapk00 authored Sep 15, 2023
1 parent 7e2a66e commit 7cbd77e
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 21 deletions.
5 changes: 5 additions & 0 deletions .changeset/bright-planes-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

test: Add e2e test for hubble startup
2 changes: 1 addition & 1 deletion apps/hubble/scripts/clidocs.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module.exports = function clidocs() {
const regex = /--\w+(-\w+)*/g;
let match;

// rome-ignore lint/suspicious/noAssignInExpressions: <explanation>
// biome-ignore lint/suspicious/noAssignInExpressions: <explanation>
while ((match = regex.exec(helpOutput)) !== null) {
optionNames.push(match[0]);
}
Expand Down
21 changes: 9 additions & 12 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ export interface HubOptions {
rebuildSyncTrie?: boolean;

/** Commit lock timeout in ms */
commitLockTimeout: number;
commitLockTimeout?: number;

/** Commit lock queue size */
commitLockMaxPending: number;
commitLockMaxPending?: number;

/** Enables the Admin Server */
adminServerEnabled?: boolean;
Expand All @@ -218,11 +218,7 @@ export interface HubOptions {
/** Periodically add casts & reactions for the following test users */
testUsers?: TestUser[];

/**
* Only allows the Hub to connect to and advertise local IP addresses
*
* Only used by tests
*/
/** Only allows the Hub to connect to and advertise local IP addresses (Only used by tests) */
localIpAddrsOnly?: boolean;

/** Cron schedule for prune messages job */
Expand All @@ -248,7 +244,7 @@ export interface HubOptions {
}

/** @returns A randomized string of the format `rocksdb.tmp.*` used for the DB Name */
const randomDbName = () => {
export const randomDbName = () => {
return `rocksdb.tmp.${(new Date().getUTCDate() * Math.random()).toString(36).substring(2)}`;
};

Expand Down Expand Up @@ -285,10 +281,6 @@ export class Hub implements HubInterface {

constructor(options: HubOptions) {
this.options = options;
this.rocksDB = new RocksDB(options.rocksDBName ? options.rocksDBName : randomDbName());
this.gossipNode = new GossipNode(this.options.network);

this.s3_snapshot_bucket = options.s3SnapshotBucket ?? SNAPSHOT_S3_DEFAULT_BUCKET;

if (!options.ethMainnetRpcUrl) {
log.warn("No ETH mainnet RPC URL provided, unable to validate ens names");
Expand Down Expand Up @@ -327,6 +319,11 @@ export class Hub implements HubInterface {
throw new HubError("bad_request.invalid_param", "Invalid fname server url");
}

this.rocksDB = new RocksDB(options.rocksDBName ? options.rocksDBName : randomDbName());
this.gossipNode = new GossipNode(this.options.network);

this.s3_snapshot_bucket = options.s3SnapshotBucket ?? SNAPSHOT_S3_DEFAULT_BUCKET;

const eventHandler = new StoreEventHandler(this.rocksDB, {
lockMaxPending: options.commitLockMaxPending,
lockTimeout: options.commitLockTimeout,
Expand Down
13 changes: 7 additions & 6 deletions apps/hubble/src/storage/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class Engine {
try {
if (fs.existsSync(workerPath)) {
this._validationWorker = new Worker(workerPath);
logger.info({ workerPath }, "created validation worker thread");
log.info({ workerPath }, "created validation worker thread");

this._validationWorker.on("message", (data) => {
const { id, message, errCode, errMessage } = data;
Expand All @@ -150,14 +150,14 @@ class Engine {
resolve(err(new HubError(errCode, errMessage)));
}
} else {
logger.warn({ id }, "validation worker promise.response not found");
log.warn({ id }, "validation worker promise.response not found");
}
});
} else {
logger.warn({ workerPath }, "validation.worker.js not found, falling back to main thread");
log.warn({ workerPath }, "validation.worker.js not found, falling back to main thread");
}
} catch (e) {
logger.warn({ workerPath, e }, "failed to create validation worker, falling back to main thread");
log.warn({ workerPath, e }, "failed to create validation worker, falling back to main thread");
}
}

Expand All @@ -176,8 +176,9 @@ class Engine {
this._revokeSignerWorker.start();

if (this._validationWorker) {
this._validationWorker.terminate();
await this._validationWorker.terminate();
this._validationWorker = undefined;
log.info("validation worker thread terminated");
}
log.info("engine stopped");
}
Expand Down Expand Up @@ -209,7 +210,7 @@ class Engine {
limiter = getRateLimiterForTotalMessages(storageUnits.value * this._totalPruneSize);
const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter);
if (isRateLimited) {
logger.warn({ fid }, "rate limit exceeded for FID");
log.warn({ fid }, "rate limit exceeded for FID");
return err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`));
}
}
Expand Down
4 changes: 2 additions & 2 deletions apps/hubble/src/storage/stores/storeEventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ const putEventTransaction = (txn: Transaction, event: HubEvent): Transaction =>
};

export type StoreEventHandlerOptions = {
lockMaxPending?: number;
lockTimeout?: number;
lockMaxPending?: number | undefined;
lockTimeout?: number | undefined;
};

class StoreEventHandler extends TypedEmitter<StoreEvents> {
Expand Down
143 changes: 143 additions & 0 deletions apps/hubble/src/test/e2e/hubbleStartup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { Factories, FarcasterNetwork, bytesToHexString } from "@farcaster/hub-nodejs";
import { deployStorageRegistry, testClient } from "../utils.js";
import { Hub, HubOptions, randomDbName } from "../../hubble.js";
import { localHttpUrl } from "../constants.js";
import { sleep } from "../../utils/crypto.js";
import { FastifyInstance } from "fastify";
import { DB_DIRECTORY } from "../../storage/db/rocksdb.js";
import fastify from "fastify";
import fs from "fs";
import { Result } from "neverthrow";

const TEST_TIMEOUT_SHORT = 10_000;

export class TestFNameRegistryServer {
private server?: FastifyInstance;
private port = 0;

public async start(): Promise<string> {
this.server = fastify();

this.server.get("/transfers", async (request, reply) => {
reply.send({ transfers: [] });
});

this.server.get("/signer", async (request, reply) => {
reply.send({ signer: bytesToHexString(Factories.EthAddress.build())._unsafeUnwrap() });
});

try {
await this.server.listen({ port: this.port, host: "localhost" });
const address = this.server.server.address();
const port = typeof address === "string" ? 0 : address?.port;
return `http://localhost:${port}`;
} catch (err) {
console.log(err);
throw err;
}
}

public async stop(): Promise<void> {
try {
await this.server?.close();
} catch (err) {
console.log(err);
}
}
}

let storageRegistryAddress: `0x${string}`;
let keyRegistryAddress: `0x${string}`;
let idRegistryAddress: `0x${string}`;

let fnameServerUrl: string;
let fnameServer: TestFNameRegistryServer;
let rocksDBName: string;

let hubOptions: HubOptions;

describe("hubble startuup tests", () => {
beforeEach(async () => {
const { contractAddress: storageAddr } = await deployStorageRegistry();
if (!storageAddr) throw new Error("Failed to deploy StorageRegistry contract");
storageRegistryAddress = storageAddr;

idRegistryAddress = bytesToHexString(Factories.EthAddress.build())._unsafeUnwrap();
keyRegistryAddress = bytesToHexString(Factories.EthAddress.build())._unsafeUnwrap();

fnameServer = new TestFNameRegistryServer();
fnameServerUrl = await fnameServer.start();

rocksDBName = randomDbName();

hubOptions = {
network: FarcasterNetwork.DEVNET,
l2StorageRegistryAddress: storageRegistryAddress,
l2IdRegistryAddress: idRegistryAddress,
l2KeyRegistryAddress: keyRegistryAddress,
l2RpcUrl: localHttpUrl,
ethMainnetRpcUrl: localHttpUrl,
fnameServerUrl,
rocksDBName,
announceIp: "127.0.0.1",
disableSnapshotSync: true,
};
});

afterAll(async () => {
await fnameServer.stop();

// rm -rf the rocksdb directory
fs.rm(`${DB_DIRECTORY}/${rocksDBName}`, { recursive: true }, (err) => {});
});

test(
"Starts up with no errors",
async () => {
const client = testClient;
let hub;

try {
hub = new Hub(hubOptions);

await hub.start(); // If exception is thrown, test errors out

// Sleep for 1 sec
await sleep(1000);

const hubState = await hub.getHubState();
expect(hubState.isOk()).toBe(true);
} finally {
await hub?.teardown();
}
},
TEST_TIMEOUT_SHORT,
);

test("Needs a valid fname server url", async () => {
const hub = Result.fromThrowable(
() => new Hub({ ...hubOptions, fnameServerUrl: "" }),
(e) => e as Error,
)();
expect(hub.isErr()).toBe(true);
expect(hub._unsafeUnwrapErr().message).toContain("Invalid fname server url");
});

test("Needs a valid l2 rpc url", async () => {
const hub = Result.fromThrowable(
() => new Hub({ ...hubOptions, l2RpcUrl: "" }),
(e) => e as Error,
)();
expect(hub.isErr()).toBe(true);
expect(hub._unsafeUnwrapErr().message).toContain("Invalid l2 rpc url");
});

test("Needs a valid eth mainnet rpc url", async () => {
const hub = Result.fromThrowable(
() => new Hub({ ...hubOptions, ethMainnetRpcUrl: "" }),
(e) => e as Error,
)();
expect(hub.isErr()).toBe(true);
expect(hub._unsafeUnwrapErr().message).toContain("Invalid eth mainnet rpc url");
});
});

0 comments on commit 7cbd77e

Please sign in to comment.