From fdcc3b52282fd11e0e5dcea567d8642b7667fb65 Mon Sep 17 00:00:00 2001 From: Wasif Iqbal Date: Tue, 2 Jul 2024 19:05:18 -0500 Subject: [PATCH] chore: add stats for bundle message delays, stale contact info (#2118) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Motivation - Need better visibility into message delays and disambiguate between bundle delay and message delay - Ensure stale contacts aren't disseminating through the network ## Change Summary - Add statsd metrics for earliest and latest message timestamp in bundle and bundle creation timestamp. All timestamps are in farcaster time - Add statsd metric for old contact info being gossiped ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. - [x] All [commits have been signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits) ## Additional Context If this is a relatively large or complex change, provide more details here that will help reviewers --- ## PR-Codex overview This PR focuses on enhancing message bundle submission in the Hubble application by adding stats for message delays and contact info staleness. ### Detailed summary - Added stats for bundle message delays and stale contact info - Updated `submitMessageBundle` method parameters in multiple files - Updated `submitMessageBundle` method in `Hub` class to include additional stats and parameters > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` --- .changeset/dull-dancers-float.md | 5 ++ apps/hubble/src/hubble.ts | 67 +++++++++++++++++-- apps/hubble/src/network/p2p/gossipNode.ts | 2 +- apps/hubble/src/network/sync/syncEngine.ts | 2 +- .../hubble/src/test/e2e/hubbleNetwork.test.ts | 6 +- apps/hubble/src/test/mocks.ts | 2 + 6 files changed, 72 insertions(+), 12 deletions(-) create mode 100644 .changeset/dull-dancers-float.md diff --git a/.changeset/dull-dancers-float.md b/.changeset/dull-dancers-float.md new file mode 100644 index 0000000000..440b5ba934 --- /dev/null +++ b/.changeset/dull-dancers-float.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +chore: add stats for bundle message delays, stale contact info diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 77cf7f5c9b..caca74e9e9 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -5,6 +5,7 @@ import { ContactInfoContent, ContactInfoContentBody, FarcasterNetwork, + fromFarcasterTime, getInsecureHubRpcClient, getSSLHubRpcClient, GossipAddressInfo, @@ -17,6 +18,7 @@ import { Message, OnChainEvent, onChainEventTypeToJSON, + toFarcasterTime, UserNameProof, validations, } from "@farcaster/hub-nodejs"; @@ -131,7 +133,12 @@ export interface HubInterface { identity: string; hubOperatorFid?: number; submitMessage(message: Message, source?: HubSubmitSource): HubAsyncResult; - submitMessageBundle(messageBundle: MessageBundle, source?: HubSubmitSource): Promise[]>; + submitMessageBundle( + creationFarcasterTime: number, + messageBundle: MessageBundle, + source?: HubSubmitSource, + peerId?: PeerId, + ): Promise[]>; validateMessage(message: Message): HubAsyncResult; submitUserNameProof(usernameProof: UserNameProof, source?: HubSubmitSource): HubAsyncResult; submitOnChainEvent(event: OnChainEvent, source?: HubSubmitSource): HubAsyncResult; @@ -1340,14 +1347,13 @@ export class Hub implements HubInterface { await this.gossipNode.reportValid(msgId, peerIdFromString(source.toString()).toBytes(), false); } } - statsd().timing("gossip.message_delay", gossipMessageDelay); const mergeResult = result.isOk() ? "success" : "failure"; - statsd().timing(`gossip.message_delay.${mergeResult}`, gossipMessageDelay); + statsd().timing("gossip.message_delay", gossipMessageDelay, { status: mergeResult }); return result.map(() => undefined); } else if (gossipMessage.messageBundle) { const bundle = gossipMessage.messageBundle; - const results = await this.submitMessageBundle(bundle, "gossip"); + const results = await this.submitMessageBundle(messageFirstGossipedTime, bundle, "gossip", source); // If at least one is Ok, report as valid const atLeastOneOk = results.find((r) => r.isOk()); @@ -1414,6 +1420,7 @@ export class Hub implements HubInterface { // Don't process messages that are too old if (message.timestamp && message.timestamp < Date.now() - MAX_CONTACT_INFO_AGE_MS) { + statsd().increment("gossip.contact_info.too_old", 1); log.debug({ message }, "contact info message is too old"); return false; } @@ -1666,7 +1673,12 @@ export class Hub implements HubInterface { /* RPC Handler API */ /* -------------------------------------------------------------------------- */ - async submitMessageBundle(messageBundle: MessageBundle, source?: HubSubmitSource): Promise[]> { + async submitMessageBundle( + creationFarcasterTime: number, + messageBundle: MessageBundle, + source?: HubSubmitSource, + peerId?: PeerId, + ): Promise[]> { if (this.syncEngine.syncTrieQSize > MAX_SYNCTRIE_QUEUE_SIZE) { log.warn({ syncTrieQSize: this.syncEngine.syncTrieQSize }, "SubmitMessage rejected: Sync trie queue is full"); // Since we're rejecting the full bundle, return an error for each message @@ -1679,6 +1691,8 @@ export class Hub implements HubInterface { const allResults: Map> = new Map(); const dedupedMessages: { i: number; message: Message }[] = []; + let earliestTimestamp = Infinity; + let latestTimestamp = -Infinity; if (source === "gossip") { // Go over all the messages and see if they are in the DB. If they are, don't bother processing them const messagesExist = await areMessagesInDb(this.rocksDB, messageBundle.messages); @@ -1688,12 +1702,51 @@ export class Hub implements HubInterface { log.debug({ source }, "submitMessageBundle rejected: Message already exists"); allResults.set(i, err(new HubError("bad_request.duplicate", "message has already been merged"))); } else { - dedupedMessages.push({ i, message: ensureMessageData(messageBundle.messages[i] as Message) }); + const message = ensureMessageData(messageBundle.messages[i] as Message); + earliestTimestamp = Math.min(earliestTimestamp, message.data?.timestamp ?? Infinity); + latestTimestamp = Math.max(latestTimestamp, message.data?.timestamp ?? -Infinity); + dedupedMessages.push({ i, message }); } } } else { - dedupedMessages.push(...messageBundle.messages.map((message, i) => ({ i, message: ensureMessageData(message) }))); + const initialResult = { + earliest: Infinity, + latest: -Infinity, + deduped: [] as { i: number; message: Message }[], + }; + const { deduped, earliest, latest } = messageBundle.messages.reduce((acc, message, i) => { + const messageData = ensureMessageData(message); + acc.earliest = Math.min(acc.earliest, messageData.data?.timestamp ?? Infinity); + acc.latest = Math.max(acc.latest, messageData.data?.timestamp ?? -Infinity); + acc.deduped.push({ i, message: messageData }); + return acc; + }, initialResult); + earliestTimestamp = earliest; + latestTimestamp = latest; + dedupedMessages.push(...deduped); } + const tags: { [key: string]: string } = { + ...(source ? { source } : {}), + ...(peerId ? { peer_id: peerId.toString() } : {}), + }; + + statsd().gauge("hub.submit_message_bundle.size", dedupedMessages.length, tags); + statsd().gauge( + "hub.submit_message_bundle.earliest_timestamp_ms", + fromFarcasterTime(earliestTimestamp).unwrapOr(0), + tags, + ); + statsd().gauge( + "hub.submit_message_bundle.latest_timestamp_ms", + fromFarcasterTime(latestTimestamp).unwrapOr(0), + tags, + ); + statsd().gauge( + "hub.submit_message_bundle.creation_time_ms", + fromFarcasterTime(creationFarcasterTime).unwrapOr(0), + tags, + ); + statsd().gauge("hub.submit_message_bundle.max_delay_ms", creationFarcasterTime - earliestTimestamp, tags); // Merge the messages const mergeResults = await this.engine.mergeMessages(dedupedMessages.map((m) => m.message)); diff --git a/apps/hubble/src/network/p2p/gossipNode.ts b/apps/hubble/src/network/p2p/gossipNode.ts index 77e5412c02..029d6a5545 100644 --- a/apps/hubble/src/network/p2p/gossipNode.ts +++ b/apps/hubble/src/network/p2p/gossipNode.ts @@ -34,7 +34,7 @@ import { sleep } from "../../utils/crypto.js"; /** The maximum number of pending merge messages before we drop new incoming gossip or sync messages. */ export const MAX_SYNCTRIE_QUEUE_SIZE = 100_000; /** The TTL for messages in the seen cache */ -export const GOSSIP_SEEN_TTL = 1000 * 60 * 5; +export const GOSSIP_SEEN_TTL = 1000 * 60 * 5; // 5 minutes /** The maximum amount of time to dial a peer in libp2p network in milliseconds */ export const LIBP2P_CONNECT_TIMEOUT_MS = 2000; diff --git a/apps/hubble/src/network/sync/syncEngine.ts b/apps/hubble/src/network/sync/syncEngine.ts index 3d97562213..8c54f062e4 100644 --- a/apps/hubble/src/network/sync/syncEngine.ts +++ b/apps/hubble/src/network/sync/syncEngine.ts @@ -1187,7 +1187,7 @@ class SyncEngine extends TypedEmitter { statsd().gauge("syncengine.merge_q", this._syncMergeQ); const startTime = Date.now(); - const results = await this._hub.submitMessageBundle(MessageBundle.create({ messages }), "sync"); + const results = await this._hub.submitMessageBundle(startTime, MessageBundle.create({ messages }), "sync"); for (let i = 0; i < results.length; i++) { const result = results[i] as HubResult; diff --git a/apps/hubble/src/test/e2e/hubbleNetwork.test.ts b/apps/hubble/src/test/e2e/hubbleNetwork.test.ts index 6c85ffa3dd..1dc5d01777 100644 --- a/apps/hubble/src/test/e2e/hubbleNetwork.test.ts +++ b/apps/hubble/src/test/e2e/hubbleNetwork.test.ts @@ -147,7 +147,7 @@ describe("hubble gossip and sync tests", () => { messages: castAddMessages, }); // Submit it to hub1 via rpc so it gets gossiped - const bundleMergeResult = await hub1.submitMessageBundle(messageBundle, "rpc"); + const bundleMergeResult = await hub1.submitMessageBundle(Date.now(), messageBundle, "rpc"); expect(bundleMergeResult.length).toEqual(5); for (let i = 0; i < 5; i++) { expect(bundleMergeResult[i]?.isOk()).toBeTruthy(); @@ -167,7 +167,7 @@ describe("hubble gossip and sync tests", () => { } // Submitting the same bundle again should result in a duplicate error if we submit via gossip - const errResult2 = await hub1.submitMessageBundle(messageBundle, "gossip"); + const errResult2 = await hub1.submitMessageBundle(Date.now(), messageBundle, "gossip"); // Expect all the messages to be duplicates for (let i = 0; i < 5; i++) { expect(errResult2[i]?.isErr()).toBeTruthy(); @@ -193,7 +193,7 @@ describe("hubble gossip and sync tests", () => { hash: new Uint8Array([0, 1, 2, 1, 2]), messages: castAddMessages2, }); - const errResult3 = await hub1.submitMessageBundle(messageBundle2, "gossip"); + const errResult3 = await hub1.submitMessageBundle(Date.now(), messageBundle2, "gossip"); expect(errResult3.length).toEqual(5); expect(errResult3[0]?.isOk()).toBeTruthy(); expect(errResult3[1]?.isErr()).toBeTruthy(); diff --git a/apps/hubble/src/test/mocks.ts b/apps/hubble/src/test/mocks.ts index e166c3262a..77e2d11ef7 100644 --- a/apps/hubble/src/test/mocks.ts +++ b/apps/hubble/src/test/mocks.ts @@ -43,8 +43,10 @@ export class MockHub implements HubInterface { } async submitMessageBundle( + _creationTimeMs: number, messageBundle: MessageBundle, source?: HubSubmitSource | undefined, + _peerId?: PeerId, ): Promise[]> { const results: HubResult[] = []; for (const message of messageBundle.messages) {