diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 1b15e56d22..c61a658b2f 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,11 +1,7 @@ -## Motivation +## Why is this change needed? Describe why this issue should be fixed and link to any relevant design docs, issues or other relevant items. -## Change Summary - -Describe the changes being made in 1-2 concise sentences. - ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ @@ -14,8 +10,3 @@ _Choose all relevant options below by adding an `x` now or at any time before su - [ ] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [ ] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. -- [ ] All [commits have been signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits) - -## Additional Context - -If this is a relatively large or complex change, provide more details here that will help reviewers diff --git a/apps/hubble/CHANGELOG.md b/apps/hubble/CHANGELOG.md index 6095ce38c7..dcc9017cd3 100644 --- a/apps/hubble/CHANGELOG.md +++ b/apps/hubble/CHANGELOG.md @@ -1,5 +1,17 @@ # @farcaster/hubble +## 1.13.6 + +### Patch Changes + +- fdcc3b52: chore: add stats for bundle message delays, stale contact info +- fa5eef40: fix: Increase message threshold to reduce snapshot bandwidth usage +- 795815af: fixed issue with cli arguments order in docker-compose.yml causing hub operator fid to be unset +- b5ff774a: feat: add hub service agreement - there will be no rewards for running a hub +- 2a82b3dc: feat: add unique peer map to sync engine to represent current active peers +- aa02a48d: fix: validate gossip message for clock skew +- 2bae6fb9: chore: Update curve25519-dalek from 4.1.1 to 4.1.3 in Rust extension + ## 1.13.5 ### Patch Changes @@ -23,7 +35,7 @@ - 2d26d305: CLI tool for measuring sync health - b150e900: fix: Use stricter socket timeout for gossip -- eacf29c9: fix: http endoint return not found instead of internal database error +- eacf29c9: fix: http endpoint return not found instead of internal database error - @farcaster/hub-nodejs@0.11.18 ## 1.13.2 @@ -608,7 +620,7 @@ - 08b652e: fix: Add txIndex to onchain events, fix wrong index being used in the primary key - b36eef2: fix: Extract snapshot on the fly while downloading snapshot - 93e43a8: fix: Use hashes to compare upgrade 'hubble.sh' versions -- 7daaae4: fix: Simplify IP addr fetching, prefering ipv4 +- 7daaae4: fix: Simplify IP addr fetching, preferring ipv4 - ac1f6ac: fix: Fetch envoy config during hubble.sh - baf983f: fix: Consume the FID rate limit only after a successful merge - Updated dependencies [08b652e] @@ -742,7 +754,7 @@ - f00d7d2: fix: Move validatorOrRevokeMessage and storageCache iterators to be managed - 115f1b5: feat: Do the validateOrRevokeMessages job fid-by-fid - 998979d: feat: Warn if there are no incoming connections -- c1bb21c: fix: When retring messages due to failed signers, use a queue +- c1bb21c: fix: When retrying messages due to failed signers, use a queue - 376ae0f: feat: Use a web based network config for hubble - @farcaster/hub-nodejs@0.9.1 diff --git a/apps/hubble/docker-compose.yml b/apps/hubble/docker-compose.yml index d70f7be27d..dd2736e0d1 100644 --- a/apps/hubble/docker-compose.yml +++ b/apps/hubble/docker-compose.yml @@ -8,8 +8,6 @@ # # git fetch --tags --force && git checkout @latest && docker compose up -version: '3.9' - services: hubble: image: sigeshuo/hubble:latest @@ -35,10 +33,10 @@ services: --eth-mainnet-rpc-url $ETH_MAINNET_RPC_URL --l2-rpc-url $OPTIMISM_L2_RPC_URL --network ${FC_NETWORK_ID:-1} + --hub-operator-fid ${HUB_OPERATOR_FID:-0} --rpc-subscribe-per-ip-limit ${RPC_SUBSCRIBE_PER_IP_LIMIT:-4} -b ${BOOTSTRAP_NODE:-/dns/nemes.farcaster.xyz/tcp/2282} --statsd-metrics-server $STATSD_METRICS_SERVER - --hub-operator-fid ${HUB_OPERATOR_FID:-0} --opt-out-diagnostics ${HUB_OPT_OUT_DIAGNOSTICS:-false} ${HUB_OPTIONS:-} ports: diff --git a/apps/hubble/package.json b/apps/hubble/package.json index a9c1cab45e..b550777b43 100644 --- a/apps/hubble/package.json +++ b/apps/hubble/package.json @@ -1,6 +1,6 @@ { "name": "@farcaster/hubble", - "version": "1.13.5", + "version": "1.13.6", "description": "Farcaster Hub", "author": "", "license": "", diff --git a/apps/hubble/src/addon/Cargo.lock b/apps/hubble/src/addon/Cargo.lock index 2afa2fd9a5..f306dcf62b 100644 --- a/apps/hubble/src/addon/Cargo.lock +++ b/apps/hubble/src/addon/Cargo.lock @@ -433,16 +433,15 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.1" +version = "4.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", "digest", "fiat-crypto", - "platforms", "rustc_version", "subtle", "zeroize", @@ -1221,12 +1220,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" -[[package]] -name = "platforms" -version = "3.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" - [[package]] name = "powerfmt" version = "0.2.0" diff --git a/apps/hubble/src/defaultConfig.ts b/apps/hubble/src/defaultConfig.ts index e335c9c21a..60b3afe3fb 100644 --- a/apps/hubble/src/defaultConfig.ts +++ b/apps/hubble/src/defaultConfig.ts @@ -10,7 +10,7 @@ const DEFAULT_GOSSIP_PORT = 2282; const DEFAULT_RPC_PORT = 2283; const DEFAULT_HTTP_API_PORT = 2281; const DEFAULT_NETWORK = 3; // Farcaster Devnet -export const DEFAULT_CATCHUP_SYNC_SNAPSHOT_MESSAGE_LIMIT = 25_000_000; +export const DEFAULT_CATCHUP_SYNC_SNAPSHOT_MESSAGE_LIMIT = 50_000_000; export const Config = { /** Path to a PeerId file */ diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 77cf7f5c9b..e6a6f65091 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -5,6 +5,7 @@ import { ContactInfoContent, ContactInfoContentBody, FarcasterNetwork, + fromFarcasterTime, getInsecureHubRpcClient, getSSLHubRpcClient, GossipAddressInfo, @@ -125,13 +126,19 @@ export const FARCASTER_VERSIONS_SCHEDULE: VersionSchedule[] = [ const MAX_CONTACT_INFO_AGE_MS = 1000 * 60 * 60; // 60 minutes const CONTACT_INFO_UPDATE_THRESHOLD_MS = 1000 * 60 * 30; // 30 minutes +const ALLOWED_CLOCK_SKEW_SECONDS = 60 * 10; // 10 minutes export interface HubInterface { engine: Engine; identity: string; hubOperatorFid?: number; submitMessage(message: Message, source?: HubSubmitSource): HubAsyncResult; - submitMessageBundle(messageBundle: MessageBundle, source?: HubSubmitSource): Promise[]>; + submitMessageBundle( + creationFarcasterTime: number, + messageBundle: MessageBundle, + source?: HubSubmitSource, + peerId?: PeerId, + ): Promise[]>; validateMessage(message: Message): HubAsyncResult; submitUserNameProof(usernameProof: UserNameProof, source?: HubSubmitSource): HubAsyncResult; submitOnChainEvent(event: OnChainEvent, source?: HubSubmitSource): HubAsyncResult; @@ -1253,6 +1260,7 @@ export class Hub implements HubInterface { await this.gossipNode.gossipContactInfo(contactInfo); statsd().gauge("peer_store.count", await this.gossipNode.peerStoreCount()); + statsd().gauge("active_peers.count", this.syncEngine.getActivePeerCount()); return Promise.resolve(ok(undefined)); } } @@ -1267,6 +1275,24 @@ export class Hub implements HubInterface { const messageFirstGossipedTime = gossipMessage.timestamp ?? 0; const gossipMessageDelay = currentTime - messageFirstGossipedTime; if (gossipMessage.timestamp) { + if (gossipMessage.timestamp > currentTime && gossipMessage.timestamp - currentTime > ALLOWED_CLOCK_SKEW_SECONDS) { + log.error( + { + allowedClockSkew: ALLOWED_CLOCK_SKEW_SECONDS, + currentTime, + gossipMessageTimestamp: gossipMessage.timestamp, + source: source.toString(), + }, + "Received gossip message with future timestamp", + ); + await this.gossipNode.reportValid(msgId, peerIdFromString(source.toString()).toBytes(), false); + return err( + new HubError( + "bad_request.invalid_param", + "Invalid Farcaster timestamp in gossip message - future timestamp found in seconds from Farcaster Epoch", + ), + ); + } // If message is older than seenTTL, we will try to merge it, but report it as invalid so it doesn't // propogate across the network const cutOffTime = getFarcasterTime().unwrapOr(0) - GOSSIP_SEEN_TTL / 1000; @@ -1340,14 +1366,13 @@ export class Hub implements HubInterface { await this.gossipNode.reportValid(msgId, peerIdFromString(source.toString()).toBytes(), false); } } - statsd().timing("gossip.message_delay", gossipMessageDelay); const mergeResult = result.isOk() ? "success" : "failure"; - statsd().timing(`gossip.message_delay.${mergeResult}`, gossipMessageDelay); + statsd().timing("gossip.message_delay", gossipMessageDelay, { status: mergeResult }); return result.map(() => undefined); } else if (gossipMessage.messageBundle) { const bundle = gossipMessage.messageBundle; - const results = await this.submitMessageBundle(bundle, "gossip"); + const results = await this.submitMessageBundle(messageFirstGossipedTime, bundle, "gossip", source); // If at least one is Ok, report as valid const atLeastOneOk = results.find((r) => r.isOk()); @@ -1414,6 +1439,7 @@ export class Hub implements HubInterface { // Don't process messages that are too old if (message.timestamp && message.timestamp < Date.now() - MAX_CONTACT_INFO_AGE_MS) { + statsd().increment("gossip.contact_info.too_old", 1); log.debug({ message }, "contact info message is too old"); return false; } @@ -1660,13 +1686,40 @@ export class Hub implements HubInterface { this.syncEngine.removeContactInfoForPeerId(connection.remotePeer.toString()); statsd().increment("peer_disconnect.count"); }); + + this.gossipNode.on("peerDiscovery", async (peerInfo) => { + // NB: The current code is not in use - we would require a source to emit peerDiscovery events. + // This is a placeholder for future use. + + // Add discovered peer to sync engine + const peerId = peerInfo.id; + const peerAddresses = peerInfo.multiaddrs; + + // sorts addresses by Public IPs first + const addr = peerAddresses.sort((a, b) => + publicAddressesFirst({ multiaddr: a, isCertified: false }, { multiaddr: b, isCertified: false }), + )[0]; + if (addr === undefined) { + log.info( + { function: "peerDiscovery", peerId: peerId.toString() }, + "peer found but no address is available to request sync", + ); + return; + } + await this.gossipNode.addPeerToAddressBook(peerId, addr); + }); } /* -------------------------------------------------------------------------- */ /* RPC Handler API */ /* -------------------------------------------------------------------------- */ - async submitMessageBundle(messageBundle: MessageBundle, source?: HubSubmitSource): Promise[]> { + async submitMessageBundle( + creationFarcasterTime: number, + messageBundle: MessageBundle, + source?: HubSubmitSource, + peerId?: PeerId, + ): Promise[]> { if (this.syncEngine.syncTrieQSize > MAX_SYNCTRIE_QUEUE_SIZE) { log.warn({ syncTrieQSize: this.syncEngine.syncTrieQSize }, "SubmitMessage rejected: Sync trie queue is full"); // Since we're rejecting the full bundle, return an error for each message @@ -1679,6 +1732,8 @@ export class Hub implements HubInterface { const allResults: Map> = new Map(); const dedupedMessages: { i: number; message: Message }[] = []; + let earliestTimestamp = Infinity; + let latestTimestamp = -Infinity; if (source === "gossip") { // Go over all the messages and see if they are in the DB. If they are, don't bother processing them const messagesExist = await areMessagesInDb(this.rocksDB, messageBundle.messages); @@ -1688,16 +1743,57 @@ export class Hub implements HubInterface { log.debug({ source }, "submitMessageBundle rejected: Message already exists"); allResults.set(i, err(new HubError("bad_request.duplicate", "message has already been merged"))); } else { - dedupedMessages.push({ i, message: ensureMessageData(messageBundle.messages[i] as Message) }); + const message = ensureMessageData(messageBundle.messages[i] as Message); + earliestTimestamp = Math.min(earliestTimestamp, message.data?.timestamp ?? Infinity); + latestTimestamp = Math.max(latestTimestamp, message.data?.timestamp ?? -Infinity); + dedupedMessages.push({ i, message }); } } } else { - dedupedMessages.push(...messageBundle.messages.map((message, i) => ({ i, message: ensureMessageData(message) }))); + const initialResult = { + earliest: Infinity, + latest: -Infinity, + deduped: [] as { i: number; message: Message }[], + }; + const { deduped, earliest, latest } = messageBundle.messages.reduce((acc, message, i) => { + const messageData = ensureMessageData(message); + acc.earliest = Math.min(acc.earliest, messageData.data?.timestamp ?? Infinity); + acc.latest = Math.max(acc.latest, messageData.data?.timestamp ?? -Infinity); + acc.deduped.push({ i, message: messageData }); + return acc; + }, initialResult); + earliestTimestamp = earliest; + latestTimestamp = latest; + dedupedMessages.push(...deduped); } + const tags: { [key: string]: string } = { + ...(source ? { source } : {}), + ...(peerId ? { peer_id: peerId.toString() } : {}), + }; + + statsd().gauge("hub.submit_message_bundle.size", dedupedMessages.length, tags); + statsd().gauge( + "hub.submit_message_bundle.earliest_timestamp_ms", + fromFarcasterTime(earliestTimestamp).unwrapOr(0), + tags, + ); + statsd().gauge( + "hub.submit_message_bundle.latest_timestamp_ms", + fromFarcasterTime(latestTimestamp).unwrapOr(0), + tags, + ); + statsd().gauge( + "hub.submit_message_bundle.creation_time_ms", + fromFarcasterTime(creationFarcasterTime).unwrapOr(0), + tags, + ); + statsd().gauge("hub.submit_message_bundle.max_delay_ms", creationFarcasterTime - earliestTimestamp, tags); // Merge the messages const mergeResults = await this.engine.mergeMessages(dedupedMessages.map((m) => m.message)); + const errorLogs: string[] = []; + const infoLogs: string[] = []; for (const [j, result] of mergeResults.entries()) { const message = dedupedMessages[j]?.message as Message; const type = messageTypeToName(message.data?.type); @@ -1706,32 +1802,28 @@ export class Hub implements HubInterface { result.match( (eventId) => { - if (this.options.logIndividualMessages) { - const logData = { - eventId, - fid: message.data?.fid, - type: type, - submittedMessage: messageToLog(message), - source, - }; - const msg = "submitMessage success"; - - if (source === "sync") { - log.debug(logData, msg); - } else { - log.info(logData, msg); - } - } else { - this.submitMessageLogger.log(source ?? "unknown-source"); - } + const parts = [ + `event_id:${eventId}`, + `farcaster_ts:${message.data?.timestamp ?? "no-timestamp"}`, + `fid:${message.data?.fid ?? "no-fid"}`, + `hash:${bytesToHexString(message.hash).unwrapOr("no-hash")}`, + `message_type:${type}`, + `source:${source}`, + ]; + infoLogs.push(`[${parts.join("|")}]`); }, (e) => { - // message is a reserved key in some logging systems, so we use submittedMessage instead - const logMessage = log.child({ - submittedMessage: messageToLog(message), - source, - }); - logMessage.warn({ errCode: e.errCode, source }, `submitMessage error: ${e.message}`); + const parts = [ + `farcaster_ts:${message.data?.timestamp ?? "no-timestamp"}`, + `fid:${message.data?.fid ?? "no-fid"}`, + `hash:${bytesToHexString(message.hash).unwrapOr("no-hash")}`, + `message_type:${type}`, + `source:${source ?? "unknown-source"}`, + `error:${e.message}`, + `error_code:${e.errCode}`, + ]; + errorLogs.push(`[${parts.join("|")}]`); + const tags: { [key: string]: string } = { error_code: e.errCode, message_type: type, @@ -1741,6 +1833,12 @@ export class Hub implements HubInterface { }, ); } + if (infoLogs.length > 0) { + log.info(infoLogs, "successful submit messages"); + } + if (errorLogs.length > 0) { + log.error(errorLogs, "failed submit messages"); + } // Convert the merge results to an Array of HubResults with the key const finalResults: HubResult[] = []; diff --git a/apps/hubble/src/network/p2p/gossipNode.ts b/apps/hubble/src/network/p2p/gossipNode.ts index 77e5412c02..df3b9b43a0 100644 --- a/apps/hubble/src/network/p2p/gossipNode.ts +++ b/apps/hubble/src/network/p2p/gossipNode.ts @@ -1,5 +1,6 @@ import { PublishResult } from "@libp2p/interface-pubsub"; import { Worker } from "worker_threads"; +import { PeerInfo } from "@libp2p/interface-peer-info"; import { ContactInfoContent, FarcasterNetwork, @@ -34,7 +35,7 @@ import { sleep } from "../../utils/crypto.js"; /** The maximum number of pending merge messages before we drop new incoming gossip or sync messages. */ export const MAX_SYNCTRIE_QUEUE_SIZE = 100_000; /** The TTL for messages in the seen cache */ -export const GOSSIP_SEEN_TTL = 1000 * 60 * 5; +export const GOSSIP_SEEN_TTL = 1000 * 60 * 5; // 5 minutes /** The maximum amount of time to dial a peer in libp2p network in milliseconds */ export const LIBP2P_CONNECT_TIMEOUT_MS = 2000; @@ -50,6 +51,8 @@ interface NodeEvents { peerConnect: (connection: Connection) => void; /** Triggers when a peer disconnects and includes the libp2p Connection object */ peerDisconnect: (connection: Connection) => void; + /** Triggers when a peer is discovered and includes the libp2p PeerInfo object */ + peerDiscovery: (peerInfo: PeerInfo) => void; } /** Optional arguments provided when creating a Farcaster Gossip Node */ diff --git a/apps/hubble/src/network/sync/syncEngine.ts b/apps/hubble/src/network/sync/syncEngine.ts index 3d97562213..9a1c65bc16 100644 --- a/apps/hubble/src/network/sync/syncEngine.ts +++ b/apps/hubble/src/network/sync/syncEngine.ts @@ -52,6 +52,14 @@ import { PeerScore, PeerScorer } from "./peerScore.js"; import { getOnChainEvent } from "../../storage/db/onChainEvent.js"; import { getUserNameProof } from "../../storage/db/nameRegistryEvent.js"; import { MaxPriorityQueue } from "@datastructures-js/priority-queue"; +import { TTLMap } from "../../utils/ttl_map.js"; +import * as buffer from "node:buffer"; +import { peerIdFromString } from "@libp2p/peer-id"; + +// Time to live for peer contact info in the Peer TTLMap +const PEER_TTL_MAP_EXPIRATION_TIME_MILLISECONDS = 1000 * 60 * 60 * 24; // 24 hours +// Time interval to run cleanup on the Peer TTLMap +const PEER_TTL_MAP_CLEANUP_INTERVAL_MILLISECONDS = 1000 * 60 * 60 * 25; // 25 hours // Number of seconds to wait for the network to "settle" before syncing. We will only // attempt to sync messages that are older than this time. @@ -246,6 +254,7 @@ class SyncEngine extends TypedEmitter { private _syncProfiler?: SyncEngineProfiler; private currentHubPeerContacts: Map = new Map(); + private uniquePeerMap: TTLMap; // Number of messages waiting to get into the SyncTrie. private _syncTrieQ = 0; @@ -282,6 +291,10 @@ class SyncEngine extends TypedEmitter { ) { super(); + this.uniquePeerMap = new TTLMap( + PEER_TTL_MAP_EXPIRATION_TIME_MILLISECONDS, + PEER_TTL_MAP_CLEANUP_INTERVAL_MILLISECONDS, + ); this._db = rocksDb; this._trie = new MerkleTrie(rocksDb); this._l2EventsProvider = l2EventsProvider; @@ -541,12 +554,17 @@ class SyncEngine extends TypedEmitter { return this.currentHubPeerContacts.size; } + public getActivePeerCount(): number { + return this.uniquePeerMap.size(); + } + public getContactInfoForPeerId(peerId: string): PeerContact | undefined { return this.currentHubPeerContacts.get(peerId); } public getCurrentHubPeerContacts() { - return this.currentHubPeerContacts.values(); + // return non-expired active peers + return this.uniquePeerMap.getAll(); } public addContactInfoForPeerId( @@ -577,7 +595,9 @@ class SyncEngine extends TypedEmitter { }, "Updated Peer ContactInfo", ); - this.currentHubPeerContacts.set(peerId.toString(), { peerId, contactInfo }); + const id = peerId.toString(); + this.uniquePeerMap.set(id, contactInfo); + this.currentHubPeerContacts.set(id, { peerId, contactInfo }); return ok(undefined); } else { return err(new HubError("bad_request.duplicate", "recent contact update found for peer")); @@ -585,6 +605,11 @@ class SyncEngine extends TypedEmitter { } public removeContactInfoForPeerId(peerId: string) { + // NB: We don't remove the peer from the uniquePeerMap here - there can be many (valid) reasons to remove a + // peer from current hub peer contacts that are due to peer behavior but not indicative of whether that peer is active. + // For example, the peer may have a bad peer score or be disallowed for this hub, but it may still be an active peer. + // Since the unique peer map is meant to return the set of all active non-expired peers, we only remove + // peers from it when their TTL expires (i.e, if we don't get updates to their contact info for 24 hours) this.currentHubPeerContacts.delete(peerId); } @@ -636,7 +661,16 @@ class SyncEngine extends TypedEmitter { // Use a buffer of 5% of our messages so the peer with the highest message count does not get picked // disproportionately const messageThreshold = snapshotResult.value.numMessages * 0.95; - peers = Array.from(this.currentHubPeerContacts.values()).filter((p) => p.contactInfo.count > messageThreshold); + peers = this.uniquePeerMap + .getAll() + .filter((p) => this.currentHubPeerContacts.has(p[0]) && p[1].count > messageThreshold) + .map( + (p) => + ({ + peerId: peerIdFromString(p[0]), + contactInfo: p[1], + }) as PeerContact, + ); } if (peers.length === 0) { @@ -1186,8 +1220,8 @@ class SyncEngine extends TypedEmitter { this._syncMergeQ += messages.length; statsd().gauge("syncengine.merge_q", this._syncMergeQ); - const startTime = Date.now(); - const results = await this._hub.submitMessageBundle(MessageBundle.create({ messages }), "sync"); + const startTime = getFarcasterTime().unwrapOr(0); + const results = await this._hub.submitMessageBundle(startTime, MessageBundle.create({ messages }), "sync"); for (let i = 0; i < results.length; i++) { const result = results[i] as HubResult; @@ -1533,16 +1567,25 @@ class SyncEngine extends TypedEmitter { const result = await this.validateAndMergeOnChainEvents( missingSyncIds.filter((id) => id.type() === SyncIdType.OnChainEvent), ); + statsd().increment("syncengine.sync_messages.onchain.success", result.successCount); + statsd().increment("syncengine.sync_messages.onchain.error", result.errCount); + statsd().increment("syncengine.sync_messages.onchain.deferred", result.deferredCount); // Then Fnames const fnameResult = await this.validateAndMergeFnames( missingSyncIds.filter((id) => id.type() === SyncIdType.FName), ); result.addResult(fnameResult); + statsd().increment("syncengine.sync_messages.fname.success", fnameResult.successCount); + statsd().increment("syncengine.sync_messages.fname.error", fnameResult.errCount); + statsd().increment("syncengine.sync_messages.fname.deferred", fnameResult.deferredCount); // And finally messages const messagesResult = await this.fetchAndMergeMessages(missingMessageIds, rpcClient); result.addResult(messagesResult); + statsd().increment("syncengine.sync_messages.message.success", messagesResult.successCount); + statsd().increment("syncengine.sync_messages.message.error", messagesResult.errCount); + statsd().increment("syncengine.sync_messages.message.deferred", messagesResult.deferredCount); this.curSync.fullResult.addResult(result); diff --git a/apps/hubble/src/rpc/server.ts b/apps/hubble/src/rpc/server.ts index 83ff5134b7..5cd0c5c8f0 100644 --- a/apps/hubble/src/rpc/server.ts +++ b/apps/hubble/src/rpc/server.ts @@ -495,7 +495,7 @@ export default class Server { return; } - const contactInfoArray = Array.from(currentHubPeerContacts).map((peerContact) => peerContact.contactInfo); + const contactInfoArray = Array.from(currentHubPeerContacts).map((peerContact) => peerContact[1]); callback(null, ContactInfoResponse.create({ contacts: contactInfoArray })); })(); }, diff --git a/apps/hubble/src/test/e2e/hubbleNetwork.test.ts b/apps/hubble/src/test/e2e/hubbleNetwork.test.ts index 6c85ffa3dd..1dc5d01777 100644 --- a/apps/hubble/src/test/e2e/hubbleNetwork.test.ts +++ b/apps/hubble/src/test/e2e/hubbleNetwork.test.ts @@ -147,7 +147,7 @@ describe("hubble gossip and sync tests", () => { messages: castAddMessages, }); // Submit it to hub1 via rpc so it gets gossiped - const bundleMergeResult = await hub1.submitMessageBundle(messageBundle, "rpc"); + const bundleMergeResult = await hub1.submitMessageBundle(Date.now(), messageBundle, "rpc"); expect(bundleMergeResult.length).toEqual(5); for (let i = 0; i < 5; i++) { expect(bundleMergeResult[i]?.isOk()).toBeTruthy(); @@ -167,7 +167,7 @@ describe("hubble gossip and sync tests", () => { } // Submitting the same bundle again should result in a duplicate error if we submit via gossip - const errResult2 = await hub1.submitMessageBundle(messageBundle, "gossip"); + const errResult2 = await hub1.submitMessageBundle(Date.now(), messageBundle, "gossip"); // Expect all the messages to be duplicates for (let i = 0; i < 5; i++) { expect(errResult2[i]?.isErr()).toBeTruthy(); @@ -193,7 +193,7 @@ describe("hubble gossip and sync tests", () => { hash: new Uint8Array([0, 1, 2, 1, 2]), messages: castAddMessages2, }); - const errResult3 = await hub1.submitMessageBundle(messageBundle2, "gossip"); + const errResult3 = await hub1.submitMessageBundle(Date.now(), messageBundle2, "gossip"); expect(errResult3.length).toEqual(5); expect(errResult3[0]?.isOk()).toBeTruthy(); expect(errResult3[1]?.isErr()).toBeTruthy(); diff --git a/apps/hubble/src/test/mocks.ts b/apps/hubble/src/test/mocks.ts index e166c3262a..77e2d11ef7 100644 --- a/apps/hubble/src/test/mocks.ts +++ b/apps/hubble/src/test/mocks.ts @@ -43,8 +43,10 @@ export class MockHub implements HubInterface { } async submitMessageBundle( + _creationTimeMs: number, messageBundle: MessageBundle, source?: HubSubmitSource | undefined, + _peerId?: PeerId, ): Promise[]> { const results: HubResult[] = []; for (const message of messageBundle.messages) { diff --git a/apps/hubble/src/utils/ttl_map.test.ts b/apps/hubble/src/utils/ttl_map.test.ts new file mode 100644 index 0000000000..b6e0107dff --- /dev/null +++ b/apps/hubble/src/utils/ttl_map.test.ts @@ -0,0 +1,142 @@ +import { TTLMap } from "./ttl_map.js"; +import { jest } from "@jest/globals"; + +describe("TTLMap", () => { + jest.useFakeTimers(); + + test("size should accurately reflect non-expired entries", () => { + const map = new TTLMap(1000); + + expect(map.size()).toBe(0); + + map.set("a", 1); + expect(map.size()).toBe(1); + + map.set("b", 2); + expect(map.size()).toBe(2); + + jest.advanceTimersByTime(500); + map.set("c", 3); + expect(map.size()).toBe(3); + + jest.advanceTimersByTime(501); + expect(map.size()).toBe(1); + + map.get("a"); // This should trigger cleanup of expired entry + expect(map.size()).toBe(1); + }); + + test("set should not double count when updating existing entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + expect(map.size()).toBe(1); + + map.set("a", 2); // Updating existing non-expired entry + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(1001); + map.set("a", 3); // Updating expired entry + expect(map.size()).toBe(1); + }); + + test("delete should correctly update size for expired and non-expired entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + map.set("b", 2); + expect(map.size()).toBe(2); + + map.delete("a"); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(1001); + expect(map.delete("b")).toBe(true); // Deleting expired entry + expect(map.size()).toBe(0); + }); + + test("get should update size when retrieving expired entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(1001); + expect(map.get("a")).toBeUndefined(); + expect(map.size()).toBe(0); + }); + + test("resetTTL should correctly handle expired and non-expired entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(500); + map.resetTTL("a"); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(750); + expect(map.get("a")).toBe(1); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(251); + map.resetTTL("a"); + expect(map.size()).toBe(1); + expect(map.get("a")).toBe(1); + }); + + test("getAll should return only non-expired entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + map.set("b", 2); + map.set("c", 3); + + jest.advanceTimersByTime(500); + map.set("d", 4); + + jest.advanceTimersByTime(501); + + const allEntries = map.getAll(); + expect(allEntries).toHaveLength(1); + expect(allEntries[0]).toEqual(["d", 4]); + expect(map.size()).toBe(1); + }); + + test("clear should reset size to zero", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + map.set("b", 2); + expect(map.size()).toBe(2); + + map.clear(); + expect(map.size()).toBe(0); + + map.set("c", 3); + expect(map.size()).toBe(1); + }); + + test("size should be accurate after multiple operations", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + map.set("b", 2); + map.set("c", 3); + expect(map.size()).toBe(3); + + jest.advanceTimersByTime(500); + map.delete("b"); + map.set("d", 4); + expect(map.size()).toBe(3); + + jest.advanceTimersByTime(501); + map.get("a"); // This should trigger cleanup + expect(map.size()).toBe(1); + + map.set("e", 5); + map.resetTTL("d"); + expect(map.size()).toBe(2); + }); +}); diff --git a/apps/hubble/src/utils/ttl_map.ts b/apps/hubble/src/utils/ttl_map.ts new file mode 100644 index 0000000000..c84af24170 --- /dev/null +++ b/apps/hubble/src/utils/ttl_map.ts @@ -0,0 +1,152 @@ +/** + * TTLMap - A high-performance Time-To-Live Map implementation + * + * This data structure provides a Map-like interface with automatic expiration of entries. + * It is designed for scenarios with thousands of elements and frequent bulk retrieval operations. + * + * Key features: + * - Constant-time complexity for get, set, and delete operations + * - Efficient bulk retrieval of non-expired entries + * - Coarse-grained TTL defined at the Map level + * - Ability to reset TTL for individual entries + * + * @template K The type of keys in the map + * @template V The type of values in the map + */ +export class TTLMap { + private map: Map; + private readonly ttl: number; + private lastCleanup: number; + private readonly cleanupInterval: number; + private nonExpiredCount: number; + + /** + * @param ttl Time-to-live in milliseconds for entries + * @param cleanupInterval Optional interval for running the cleanup process (default: ttl / 2) + */ + constructor(ttl: number, cleanupInterval?: number) { + this.map = new Map(); + this.ttl = ttl; + this.lastCleanup = Date.now(); + this.cleanupInterval = cleanupInterval || Math.floor(ttl / 2); + this.nonExpiredCount = 0; + } + + /** + * Sets a key-value pair in the map with the current TTL + * @param key The key to set + * @param value The value to set + * @returns The TTLMap instance for method chaining + */ + set(key: K, value: V): this { + const now = Date.now(); + const expiresAt = now + this.ttl; + const existingEntry = this.map.get(key); + + if (!existingEntry) { + this.nonExpiredCount++; + } + + this.map.set(key, { value, expiresAt }); + this.cleanupIfNeeded(); + return this; + } + + /** + * Retrieves a value from the map if it exists and hasn't expired + * @param key The key to retrieve + * @returns The value if found and not expired, undefined otherwise + */ + get(key: K): V | undefined { + const entry = this.map.get(key); + if (entry) { + if (entry.expiresAt > Date.now()) { + return entry.value; + } else { + this.map.delete(key); + this.nonExpiredCount--; + } + } + return undefined; + } + + /** + * Deletes a key-value pair from the map + * @param key The key to delete + * @returns true if the element was in the map and not expired, false otherwise + */ + delete(key: K): boolean { + const entry = this.map.get(key); + if (entry) { + if (entry.expiresAt > Date.now()) { + this.nonExpiredCount--; + } + return this.map.delete(key); + } + return false; + } + + /** + * Resets the TTL for a given key without changing its value + * @param key The key to reset the TTL for + * @returns true if the key exists and TTL was reset, false otherwise + */ + resetTTL(key: K): boolean { + const entry = this.map.get(key); + if (entry) { + entry.expiresAt = Date.now() + this.ttl; + return true; + } + return false; + } + + /** + * Retrieves all non-expired entries from the map + * This method is optimized for frequent calls and large datasets + * @returns An array of [key, value] pairs for all non-expired entries + */ + getAll(): [K, V][] { + this.cleanupIfNeeded(); + const now = Date.now(); + return Array.from(this.map.entries()) + .filter(([, { expiresAt }]) => expiresAt > now) + .map(([key, { value }]) => [key, value]); + } + + /** + * Clears all entries from the map + */ + clear(): void { + this.nonExpiredCount = 0; + this.map.clear(); + } + + /** + * Returns the number of non-expired entries in the map + * @returns The number of non-expired entries in the map + */ + size(): number { + this.cleanupIfNeeded(); + return this.nonExpiredCount; + } + + /** + * Performs cleanup of expired entries if the cleanup interval has elapsed + * This method is called internally and doesn't need to be invoked manually + */ + private cleanupIfNeeded(): void { + const now = Date.now(); + if (now - this.lastCleanup > this.cleanupInterval) { + let nonExpired = 0; + this.map.forEach((entry, key) => { + if (entry.expiresAt <= now) { + this.map.delete(key); + } else { + nonExpired++; + } + }); + this.lastCleanup = now; + this.nonExpiredCount = nonExpired; + } + } +} diff --git a/apps/hubble/www/docs/intro/install.md b/apps/hubble/www/docs/intro/install.md index 92758daf21..92eccbc330 100644 --- a/apps/hubble/www/docs/intro/install.md +++ b/apps/hubble/www/docs/intro/install.md @@ -8,7 +8,7 @@ Hubble can be installed in 30 minutes, and a full sync can take 1-2 hours to com - 16 GB of RAM - 4 CPU cores or vCPUs -- 140 GB of free storage +- 160 GB of free storage - A public IP address with ports 2282 - 2285 exposed See [tutorials](./tutorials.html) for instructions on how to set up cloud providers to run Hubble. diff --git a/apps/hubble/www/docs/tutorials/gcp.md b/apps/hubble/www/docs/tutorials/gcp.md index c5cd7aa959..5cd79d8c06 100644 --- a/apps/hubble/www/docs/tutorials/gcp.md +++ b/apps/hubble/www/docs/tutorials/gcp.md @@ -44,7 +44,7 @@ resource "google_compute_instance" "farcaster-hub-vm" { boot_disk { initialize_params { image = "ubuntu-2004-focal-v20231213" # Ubuntu 20.04 LTS image URL - size = 60 # 60 GB disk size + size = 160 # 160 GB disk size } } diff --git a/packages/shuttle/CHANGELOG.md b/packages/shuttle/CHANGELOG.md index 9f6a9239a3..019239cf87 100644 --- a/packages/shuttle/CHANGELOG.md +++ b/packages/shuttle/CHANGELOG.md @@ -1,5 +1,29 @@ # @farcaster/hub-shuttle +## 0.5.1 + +### Patch Changes + +- Fix reset of limit for total batch bytes + +## 0.5.0 + +### Minor Changes + +- Support customization of event batch size and time between flushes + +## 0.4.4 + +### Patch Changes + +- 864261b7: feat(shuttle) Allow Redis client to be a cluster instance + +## 0.4.3 + +### Patch Changes + +- 45180584: Gracefully handle "no such key" when querying group on first start + ## 0.4.2 ### Patch Changes diff --git a/packages/shuttle/package.json b/packages/shuttle/package.json index fb414a7a4b..c7f1801cf8 100644 --- a/packages/shuttle/package.json +++ b/packages/shuttle/package.json @@ -1,6 +1,6 @@ { "name": "@farcaster/shuttle", - "version": "0.4.2", + "version": "0.5.1", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/shuttle/src/example-app/package.json b/packages/shuttle/src/example-app/package.json index 344bd42273..5820babc1c 100644 --- a/packages/shuttle/src/example-app/package.json +++ b/packages/shuttle/src/example-app/package.json @@ -4,7 +4,7 @@ "main": "./app.ts", "license": "MIT", "dependencies": { - "@farcaster/shuttle": "^0.2.0", + "@farcaster/shuttle": "^0.5.0", "@figma/hot-shots": "^9.0.0-figma.1", "commander": "^11.0.0", "ioredis": "^5.3.2", diff --git a/packages/shuttle/src/example-app/worker.ts b/packages/shuttle/src/example-app/worker.ts index ccecc262b8..d3deff5c0c 100644 --- a/packages/shuttle/src/example-app/worker.ts +++ b/packages/shuttle/src/example-app/worker.ts @@ -1,11 +1,11 @@ -import { Redis } from "ioredis"; +import { Cluster, Redis } from "ioredis"; import { Job, Queue, Worker } from "bullmq"; import { App } from "./app"; import { pino } from "pino"; const QUEUE_NAME = "default"; -export function getWorker(app: App, redis: Redis, log: pino.Logger, concurrency = 1) { +export function getWorker(app: App, redis: Redis | Cluster, log: pino.Logger, concurrency = 1) { const worker = new Worker( QUEUE_NAME, async (job: Job) => { @@ -38,7 +38,7 @@ export function getWorker(app: App, redis: Redis, log: pino.Logger, concurrency return worker; } -export function getQueue(redis: Redis) { +export function getQueue(redis: Redis | Cluster) { return new Queue("default", { connection: redis, defaultJobOptions: { attempts: 3, backoff: { delay: 1000, type: "exponential" } }, diff --git a/packages/shuttle/src/shuttle/eventStream.ts b/packages/shuttle/src/shuttle/eventStream.ts index 7cbc98eb53..1a86bfd766 100644 --- a/packages/shuttle/src/shuttle/eventStream.ts +++ b/packages/shuttle/src/shuttle/eventStream.ts @@ -42,11 +42,21 @@ export class EventStreamConnection { * Creates a consumer group for the given stream. */ async createGroup(key: string, consumerGroup: string) { + // Check if the group already exists try { - // Check if the group already exists const groups = (await this.client.xinfo("GROUPS", key)) as [string, string][]; if (groups.some(([_fieldName, groupName]) => groupName === consumerGroup)) return; + } catch (e: unknown) { + if (typeof e === "object" && e !== null && e instanceof ReplyError) { + if ("message" in e && (e.message as string).startsWith("ERR no such key")) { + // Ignore if the group hasn't been created yet + } else { + throw e; + } + } + } + try { // Otherwise create the group return await this.client.xgroup("CREATE", key, consumerGroup, "0", "MKSTREAM"); } catch (e: unknown) { diff --git a/packages/shuttle/src/shuttle/hubSubscriber.ts b/packages/shuttle/src/shuttle/hubSubscriber.ts index 7cb0d87d20..bf42c65fb6 100644 --- a/packages/shuttle/src/shuttle/hubSubscriber.ts +++ b/packages/shuttle/src/shuttle/hubSubscriber.ts @@ -175,8 +175,12 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber { private redis: RedisClient; public readonly streamKey: string; public readonly redisKey: string; - private eventsToAdd: HubEvent[]; - private eventBatchSize = 100; + private eventsToAdd: [HubEvent, Buffer][]; + public eventBatchSize = 100; + private eventBatchLastFlushedAt = 0; + public maxTimeBetweenBatchFlushes = 200; // Millis + public maxBatchBytesBeforeForceFlush = 2 ** 20; // 2 MiB + private eventBatchBytes = 0; constructor( label: string, @@ -208,17 +212,30 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber { } public override async processHubEvent(event: HubEvent): Promise { - this.eventsToAdd.push(event); - if (this.eventsToAdd.length >= this.eventBatchSize) { - let lastEventId: number | undefined; - for (const evt of this.eventsToAdd) { - await this.eventStream.add(this.streamKey, Buffer.from(HubEvent.encode(evt).finish())); - lastEventId = evt.id; - } - if (lastEventId) { - await this.redis.setLastProcessedEvent(this.redisKey, lastEventId); - } - this.eventsToAdd = []; + const eventBytes = Buffer.from(HubEvent.encode(event).finish()); + this.eventBatchBytes += eventBytes.length; + this.eventsToAdd.push([event, eventBytes]); + if ( + this.eventsToAdd.length >= this.eventBatchSize || + this.eventBatchBytes >= this.maxBatchBytesBeforeForceFlush || + Date.now() - this.eventBatchLastFlushedAt > this.maxTimeBetweenBatchFlushes + ) { + // Empties the current batch + const eventBatch = this.eventsToAdd.splice(0, this.eventsToAdd.length); + this.eventBatchBytes = 0; + + // Copies the removed events to the stream + await this.eventStream.add( + this.streamKey, + eventBatch.map(([_event, eventBytes]) => eventBytes), + ); + + this.eventBatchLastFlushedAt = Date.now(); + + // biome-ignore lint/style/noNonNullAssertion: batch always has at least one event + const [evt, eventBytes] = eventBatch[eventBatch.length - 1]!; + const lastEventId = evt.id; + await this.redis.setLastProcessedEvent(this.redisKey, lastEventId); } return true; diff --git a/packages/shuttle/src/shuttle/redis.ts b/packages/shuttle/src/shuttle/redis.ts index 357ae53ec4..67c6840737 100644 --- a/packages/shuttle/src/shuttle/redis.ts +++ b/packages/shuttle/src/shuttle/redis.ts @@ -1,4 +1,4 @@ -import { Redis, RedisOptions } from "ioredis"; +import { Redis, Cluster, RedisOptions } from "ioredis"; export const getRedisClient = (redisUrl: string, redisOpts?: RedisOptions) => { const client = new Redis(redisUrl, { @@ -10,8 +10,8 @@ export const getRedisClient = (redisUrl: string, redisOpts?: RedisOptions) => { }; export class RedisClient { - public client: Redis; - constructor(client: Redis) { + public client: Redis | Cluster; + constructor(client: Redis | Cluster) { this.client = client; } static create(redisUrl: string, redisOpts?: RedisOptions) { diff --git a/scripts/hubble.sh b/scripts/hubble.sh index e918da7d6b..cbd45d284a 100755 --- a/scripts/hubble.sh +++ b/scripts/hubble.sh @@ -120,6 +120,79 @@ fetch_latest_docker_compose_and_dashboard() { fetch_file_from_repo "$GRAFANA_INI_PATH" "grafana/grafana.ini" } +# Prompt for hub operator agreement +prompt_for_hub_operator_agreement() { + ( + env_file=".env" + + update_env_file() { + key="AGREE_NO_REWARDS_FOR_ME" + value="true" + temp_file="${env_file}.tmp" + + if [ -f "$env_file" ]; then + # File exists, update or append + updated=0 + while IFS= read -r line || [ -n "$line" ]; do + if [ "${line%%=*}" = "$key" ]; then + echo "$key=$value" >>"$temp_file" + updated=1 + else + echo "$line" >>"$temp_file" + fi + done <"$env_file" + + if [ $updated -eq 0 ]; then + echo "$key=$value" >>"$temp_file" + fi + + mv "$temp_file" "$env_file" + else + # File doesn't exist, create it + echo "$key=$value" >"$env_file" + fi + } + + prompt_agreement() { + tried=0 + while true; do + printf "⚠️ IMPORTANT: You will NOT get any rewards for running this hub\n" + printf "> Please type \"Yes\" to continue: " + read -r response + case $(printf "%s" "$response" | tr '[:upper:]' '[:lower:]') in + yes | y) + printf "✅ You have agreed to the terms of service. Proceeding...\n" + update_env_file + return 0 + ;; + *) + tried=$((tried + 1)) + if [ $tried -gt 10 ]; then + printf "❌ You have not agreed to the terms of service. Please run script again manually to agree and continue.\n" + exit 1 + fi + printf "[i] Incorrect input. Please try again.\n" + ;; + esac + done + } + + if grep -q "AGREE_NO_REWARDS_FOR_ME=true" "$env_file"; then + printf "✅ You have agreed to the terms of service. Proceeding...\n" + return 0 + else + # Check if stdin is a terminal + if [ -t 0 ]; then + prompt_agreement + return $? + fi + + printf "❌ You have not agreed to the terms of service. Please run script again manually to agree and continue.\n" + return 1 + fi + ) +} + validate_and_store() { local rpc_name=$1 local expected_chain_id=$2 @@ -493,6 +566,9 @@ reexec_as_root_if_needed() { # Call the function at the beginning of your script reexec_as_root_if_needed "$@" +# Prompt for hub operator agreement +prompt_for_hub_operator_agreement || exit $? + # Check for the "up" command-line argument if [ "$1" == "up" ]; then # Setup the docker-compose command