Skip to content

Commit

Permalink
[wip] feat: Enable HTTP API Server (farcasterxyz#1405)
Browse files Browse the repository at this point in the history
  • Loading branch information
adityapk00 authored Sep 22, 2023
1 parent 4b99edd commit aa6553b
Show file tree
Hide file tree
Showing 27 changed files with 2,238 additions and 216 deletions.
5 changes: 5 additions & 0 deletions .changeset/brave-coins-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: Enable HTTP API server
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This monorepo contains Hubble, an official Farcaster Hub implementation, and oth

1. To run Hubble, see the [Hubble docs](https://www.thehubble.xyz/).
1. To use Hubble, see the [hub-nodejs docs](./packages/hub-nodejs/docs/README.md).
1. To use the HTTP API to read Hubble data, see the [HTTP API docs](https://www.thehubble.xyz/docs/httpapi.html)

## Packages

Expand Down
8 changes: 6 additions & 2 deletions apps/hubble/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@
"prettier-config-custom": "*",
"progress": "~2.0.3",
"ts-mockito": "~2.6.1",
"tsx": "~3.12.5"
"tsx": "~3.12.5",
"remark": "^15.0.1",
"remark-gfm": "^4.0.0",
"remark-parse": "^11.0.0",
"unified": "^11.0.3"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.400.0",
Expand All @@ -82,9 +86,9 @@
"axios": "^1.4.0",
"cli-progress": "^3.12.0",
"commander": "~10.0.0",
"libp2p": "0.43.4",
"fastify": "^4.22.0",
"hot-shots": "^10.0.0",
"libp2p": "0.43.4",
"neverthrow": "~6.0.0",
"node-cron": "~3.0.2",
"pino": "~8.11.0",
Expand Down
191 changes: 191 additions & 0 deletions apps/hubble/scripts/httpapidocs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import { readFileSync } from "fs";
import { join, dirname } from "path";
import { fileURLToPath } from "url";
import { remark } from "remark";
import remarkParse from "remark-parse";
import remarkGfm from "remark-gfm";

const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);

/**
* This linter checks that each of the HTTP API server endpoints is documented properly.
* This will check:
* 1. That all endpoints have a "@doc-tag:" comment in the httpServer.ts file
* 2. Make sure that all endpoints have a corresponding section in the HTTP API docs
* 3. Make sure that all parameters for every endpoint are documented in the HTTP API docs
* under the corresponding section
* 4. Make sure that all parameters that are documented are specified in the @docs-tag comment
* for that endpoint in the httpServer.ts file
*/
export function httpapidocs() {
function extractUniqueEndpoints(fileContent) {
const endpointSet = new Set();
const regex = /\"\/v\d+\/([a-zA-Z0-9_]+)([a-zA-Z0-9_\/:]*)\"/g;
let match;

// biome-ignore lint/suspicious/noAssignInExpressions: <explanation>
while ((match = regex.exec(fileContent)) !== null) {
endpointSet.add(match[1]);
}

return [...endpointSet];
}

function findMissingEndpointsInDocs(endpoints, docsContent) {
const missingEndpoints = endpoints.filter((endpoint) => !docsContent.includes(`### ${endpoint}`));

return missingEndpoints;
}

function extractDocTags(fileContent) {
const endpointMap = {};
const regex = /\/\/ @doc-tag:? \/([a-zA-Z0-9_]+)\??([^ \n]*)/;
const lines = fileContent.split("\n");

lines.forEach((line, index) => {
const match = line.match(regex);
if (match) {
const endpoint = match[1];
const queryParams = match[2]
.split("&")
.filter(Boolean)
.map((param) => param.split("=")[0]);

if (!endpointMap[endpoint]) {
endpointMap[endpoint] = { params: new Set(), lineNumbers: [] };
}

queryParams.forEach((param) => endpointMap[endpoint].params.add(param));
endpointMap[endpoint].lineNumbers.push(index + 1);
}
});

// Convert sets to arrays for the final result
for (const endpoint in endpointMap) {
endpointMap[endpoint].params = [...endpointMap[endpoint].params];
}

return endpointMap;
}

function findMissingDocTags(endpointMap, endpoints) {
const missingDocTags = [];

for (const endpoint of endpoints) {
if (!endpointMap[endpoint]) {
missingDocTags.push(endpoint);
}
}

return missingDocTags;
}

function parseMarkdown(markdownContent) {
const processor = remark().use(remarkParse).use(remarkGfm);
const tree = processor.parse(markdownContent);

return tree;
}

function getParametersForEndpoint(endpoint, tree) {
let foundEndpoint = false;
let parameters = [];
let line = 0;

tree.children.forEach((node, index) => {
if (node.type === "heading" && node.children[0].value === endpoint) {
foundEndpoint = true;
}

if (foundEndpoint && node.type === "table") {
parameters = node.children
.slice(1)
.map((row) => row.children[0].children[0]?.value)
.filter((p) => p !== undefined);
line = node.position.start.line;
foundEndpoint = false; // Reset to stop looking after finding the table
}
});

return { parameters, line };
}

function checkParametersInDocs(docTags, tree) {
// For each endpoint, check if the parameters in the doc tags are present in the docs
let anyError = false;
for (const endpoint in docTags) {
const { parameters, line } = getParametersForEndpoint(endpoint, tree);

for (const param of docTags[endpoint].params) {
if (!parameters.includes(param)) {
anyError = true;
console.error(
`Parameter "${param}" specified in the @doc-tag (on httpServer.ts: line ${docTags[
endpoint
].lineNumbers.join(
", ",
)}) is missing documentation in the parameters table (on httpapi.md: line ${line}) for endpoint "${endpoint}"`,
);
}
}

// Check the other way. No excess params
for (const param of parameters) {
if (!docTags[endpoint].params.includes(param)) {
anyError = true;
console.error(
`Parameter "${param}" is documented in the parameters table (on httpapi.md: line ${line}) for endpoint "${endpoint}" but is not specified in the @doc-tag (on httpServer.ts: line ${docTags[
endpoint
].lineNumbers.join(", ")})`,
);
}
}
}

return anyError;
}

const apiFilePath = join(__dirname, "../src/rpc/httpServer.ts");
const contents = readFileSync(apiFilePath, "utf-8");

const endpoints = extractUniqueEndpoints(contents);
const docTags = extractDocTags(contents);

const docFilePath = join(__dirname, "../www/docs/docs/httpapi.md");
const docsContent = readFileSync(docFilePath, "utf-8");
const tree = parseMarkdown(docsContent);

// console.log(getParametersForEndpoint("castsByParent", tree));

// First, get all endPoints that are not documented in the docs
const missingEndpoints = findMissingEndpointsInDocs(endpoints, docsContent);

// Next, get all endpoints that are documented but are missing doc tags
const missingDocTags = findMissingDocTags(docTags, endpoints);

// console.log(docTags);
// Last, check for parameters
let anyError = checkParametersInDocs(docTags, tree);

if (missingEndpoints.length > 0) {
console.error(
"The following endpoints specified in httpServer.ts are missing from the HTTP API docs in httpapi.md:",
);
console.error(missingEndpoints);
anyError = true;
}

if (missingDocTags.length > 0) {
console.error("The following endpoints specified in httpServer.ts are missing doc tags:");
console.error(missingDocTags);
anyError = true;
}

if (anyError) {
console.log("❌ HTTP API docs are not up to date");
process.exit(1);
} else {
console.log("✨ HTTP API docs are up to date");
}
}
6 changes: 3 additions & 3 deletions apps/hubble/scripts/linter.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

async function executeAll() {
const grafana = require("./grafanadash.cjs");
await grafana();

const clidocs = require("./clidocs.cjs");
await clidocs();
const { httpapidocs } = await import("./httpapidocs.js");

await Promise.all([grafana(), clidocs(), httpapidocs()]);
}

executeAll();
6 changes: 4 additions & 2 deletions apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ app
.option("--hub-operator-fid <fid>", "The FID of the hub operator")
.option("-c, --config <filepath>", "Path to the config file.")
.option("--db-name <name>", "The name of the RocksDB instance. (default: rocks.hub._default)")
.option("--admin-server-enabled", "Enable the admin server. (default: disabled)")
.option("--admin-server-host <host>", "The host the admin server should listen on. (default: '127.0.0.1')")
.option("--process-file-prefix <prefix>", 'Prefix for file to which hub process number is written. (default: "")')

// Ethereum Options
Expand Down Expand Up @@ -127,6 +125,9 @@ app
"RPC rate limit for peers specified in rpm. Set to -1 for none. (default: 20k/min)",
)
.option("--rpc-subscribe-per-ip-limit <number>", "Maximum RPC subscriptions per IP address. (default: 4)")
.option("--admin-server-enabled", "Enable the admin server. (default: disabled)")
.option("--admin-server-host <host>", "The host the admin server should listen on. (default: '127.0.0.1')")
.option("--http-server-disabled", "Disable the HTTP server. (default: enabled)")

// Snapshots
.option("--enable-snapshot-to-s3", "Enable daily snapshots to be uploaded to S3. (default: disabled)")
Expand Down Expand Up @@ -512,6 +513,7 @@ app
commitLockTimeout: cliOptions.commitLockTimeout ?? hubConfig.commitLockTimeout,
commitLockMaxPending: cliOptions.commitLockMaxPending ?? hubConfig.commitLockMaxPending,
adminServerEnabled: cliOptions.adminServerEnabled ?? hubConfig.adminServerEnabled,
httpServerDisabled: cliOptions.httpServerDisabled ?? hubConfig.httpServerDisabled ?? false,
adminServerHost: cliOptions.adminServerHost ?? hubConfig.adminServerHost,
testUsers: testUsers,
directPeers,
Expand Down
12 changes: 11 additions & 1 deletion apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ export interface HubOptions {
/** Commit lock queue size */
commitLockMaxPending?: number;

/** Http server disabled? */
httpServerDisabled?: boolean;

/** Enables the Admin Server */
adminServerEnabled?: boolean;

Expand Down Expand Up @@ -575,6 +578,11 @@ export class Hub implements HubInterface {

// Start the RPC server
await this.rpcServer.start(this.options.rpcServerHost, this.options.rpcPort ?? 0);
if (!this.options.httpServerDisabled) {
await this.httpApiServer.start(this.options.rpcServerHost, this.options.httpApiPort ?? 0);
} else {
log.info("HTTP API server disabled");
}
if (this.options.adminServerEnabled) {
await this.adminServer.start(this.options.adminServerHost ?? "127.0.0.1");
}
Expand Down Expand Up @@ -799,7 +807,9 @@ export class Hub implements HubInterface {
clearInterval(this.contactTimer);

// First, stop the RPC/Gossip server so we don't get any more messages

if (!this.options.httpServerDisabled) {
await this.httpApiServer.stop();
}
await this.rpcServer.stop(true); // Force shutdown until we have a graceful way of ending active streams

// Stop admin, gossip and sync engine
Expand Down
40 changes: 26 additions & 14 deletions apps/hubble/src/network/sync/merkleTrie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,16 @@ class MerkleTrie {
this._worker.addListener("message", async (event) => {
// console.log("Received message from worker thread", event);
if (event.dbGetCallId) {
const value = await ResultAsync.fromPromise(this._db.get(Buffer.from(event.key)), (e) => e as Error);
if (value.isErr()) {
log.warn({ key: event.key, error: value.error }, "Error getting value from DB");
// This can happen sometimes in tests when the DB is closed before the worker thread
let value = undefined;
if (this._db.status === "closed") {
log.warn("DB is closed. Ignoring DB read request from merkle trie worker thread");
} else {
value = await ResultAsync.fromPromise(this._db.get(Buffer.from(event.key)), (e) => e as Error);
}

if (!value || value.isErr()) {
log.warn({ key: event.key, error: value?.error }, "Error getting value from DB");
this._worker.postMessage({
dbGetCallId: event.dbGetCallId,
value: undefined,
Expand All @@ -126,19 +133,24 @@ class MerkleTrie {
});
}
} else if (event.dbKeyValuesCallId) {
const keyValues = event.dbKeyValues as MerkleTrieKV[];
const txn = this._db.transaction();

// Collect all the pending DB updates into a single transaction batch
for (const { key, value } of keyValues) {
if (value && value.length > 0) {
txn.put(Buffer.from(key), Buffer.from(value));
} else {
txn.del(Buffer.from(key));
// This can happen sometimes in tests when the DB is closed before the worker thread
if (this._db.status === "closed") {
log.warn("DB is closed. Ignoring DB write request from merkle trie worker thread");
} else {
const keyValues = event.dbKeyValues as MerkleTrieKV[];
const txn = this._db.transaction();

// Collect all the pending DB updates into a single transaction batch
for (const { key, value } of keyValues) {
if (value && value.length > 0) {
txn.put(Buffer.from(key), Buffer.from(value));
} else {
txn.del(Buffer.from(key));
}
}
}

await this._db.commit(txn);
await this._db.commit(txn);
}
this._worker.postMessage({
dbKeyValuesCallId: event.dbKeyValuesCallId,
});
Expand Down
Loading

0 comments on commit aa6553b

Please sign in to comment.