Skip to content

Commit

Permalink
chore: add stats for bundle message delays, stale contact info (farca…
Browse files Browse the repository at this point in the history
…sterxyz#2118)

## Motivation

- Need better visibility into message delays and disambiguate between
bundle delay and message delay
- Ensure stale contacts aren't disseminating through the network

## Change Summary

- Add statsd metrics for earliest and latest message timestamp in bundle
and bundle creation timestamp. All timestamps are in farcaster time
- Add statsd metric for old contact info being gossiped

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [x] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

## Additional Context

If this is a relatively large or complex change, provide more details
here that will help reviewers


<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on enhancing message bundle submission in the Hubble
application by adding stats for message delays and contact info
staleness.

### Detailed summary
- Added stats for bundle message delays and stale contact info
- Updated `submitMessageBundle` method parameters in multiple files
- Updated `submitMessageBundle` method in `Hub` class to include
additional stats and parameters

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
Wazzymandias authored Jul 3, 2024
1 parent 59bcfe7 commit fdcc3b5
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .changeset/dull-dancers-float.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

chore: add stats for bundle message delays, stale contact info
67 changes: 60 additions & 7 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ContactInfoContent,
ContactInfoContentBody,
FarcasterNetwork,
fromFarcasterTime,
getInsecureHubRpcClient,
getSSLHubRpcClient,
GossipAddressInfo,
Expand All @@ -17,6 +18,7 @@ import {
Message,
OnChainEvent,
onChainEventTypeToJSON,
toFarcasterTime,
UserNameProof,
validations,
} from "@farcaster/hub-nodejs";
Expand Down Expand Up @@ -131,7 +133,12 @@ export interface HubInterface {
identity: string;
hubOperatorFid?: number;
submitMessage(message: Message, source?: HubSubmitSource): HubAsyncResult<number>;
submitMessageBundle(messageBundle: MessageBundle, source?: HubSubmitSource): Promise<HubResult<number>[]>;
submitMessageBundle(
creationFarcasterTime: number,
messageBundle: MessageBundle,
source?: HubSubmitSource,
peerId?: PeerId,
): Promise<HubResult<number>[]>;
validateMessage(message: Message): HubAsyncResult<Message>;
submitUserNameProof(usernameProof: UserNameProof, source?: HubSubmitSource): HubAsyncResult<number>;
submitOnChainEvent(event: OnChainEvent, source?: HubSubmitSource): HubAsyncResult<number>;
Expand Down Expand Up @@ -1340,14 +1347,13 @@ export class Hub implements HubInterface {
await this.gossipNode.reportValid(msgId, peerIdFromString(source.toString()).toBytes(), false);
}
}
statsd().timing("gossip.message_delay", gossipMessageDelay);
const mergeResult = result.isOk() ? "success" : "failure";
statsd().timing(`gossip.message_delay.${mergeResult}`, gossipMessageDelay);
statsd().timing("gossip.message_delay", gossipMessageDelay, { status: mergeResult });

return result.map(() => undefined);
} else if (gossipMessage.messageBundle) {
const bundle = gossipMessage.messageBundle;
const results = await this.submitMessageBundle(bundle, "gossip");
const results = await this.submitMessageBundle(messageFirstGossipedTime, bundle, "gossip", source);

// If at least one is Ok, report as valid
const atLeastOneOk = results.find((r) => r.isOk());
Expand Down Expand Up @@ -1414,6 +1420,7 @@ export class Hub implements HubInterface {

// Don't process messages that are too old
if (message.timestamp && message.timestamp < Date.now() - MAX_CONTACT_INFO_AGE_MS) {
statsd().increment("gossip.contact_info.too_old", 1);
log.debug({ message }, "contact info message is too old");
return false;
}
Expand Down Expand Up @@ -1666,7 +1673,12 @@ export class Hub implements HubInterface {
/* RPC Handler API */
/* -------------------------------------------------------------------------- */

async submitMessageBundle(messageBundle: MessageBundle, source?: HubSubmitSource): Promise<HubResult<number>[]> {
async submitMessageBundle(
creationFarcasterTime: number,
messageBundle: MessageBundle,
source?: HubSubmitSource,
peerId?: PeerId,
): Promise<HubResult<number>[]> {
if (this.syncEngine.syncTrieQSize > MAX_SYNCTRIE_QUEUE_SIZE) {
log.warn({ syncTrieQSize: this.syncEngine.syncTrieQSize }, "SubmitMessage rejected: Sync trie queue is full");
// Since we're rejecting the full bundle, return an error for each message
Expand All @@ -1679,6 +1691,8 @@ export class Hub implements HubInterface {
const allResults: Map<number, HubResult<number>> = new Map();

const dedupedMessages: { i: number; message: Message }[] = [];
let earliestTimestamp = Infinity;
let latestTimestamp = -Infinity;
if (source === "gossip") {
// Go over all the messages and see if they are in the DB. If they are, don't bother processing them
const messagesExist = await areMessagesInDb(this.rocksDB, messageBundle.messages);
Expand All @@ -1688,12 +1702,51 @@ export class Hub implements HubInterface {
log.debug({ source }, "submitMessageBundle rejected: Message already exists");
allResults.set(i, err(new HubError("bad_request.duplicate", "message has already been merged")));
} else {
dedupedMessages.push({ i, message: ensureMessageData(messageBundle.messages[i] as Message) });
const message = ensureMessageData(messageBundle.messages[i] as Message);
earliestTimestamp = Math.min(earliestTimestamp, message.data?.timestamp ?? Infinity);
latestTimestamp = Math.max(latestTimestamp, message.data?.timestamp ?? -Infinity);
dedupedMessages.push({ i, message });
}
}
} else {
dedupedMessages.push(...messageBundle.messages.map((message, i) => ({ i, message: ensureMessageData(message) })));
const initialResult = {
earliest: Infinity,
latest: -Infinity,
deduped: [] as { i: number; message: Message }[],
};
const { deduped, earliest, latest } = messageBundle.messages.reduce((acc, message, i) => {
const messageData = ensureMessageData(message);
acc.earliest = Math.min(acc.earliest, messageData.data?.timestamp ?? Infinity);
acc.latest = Math.max(acc.latest, messageData.data?.timestamp ?? -Infinity);
acc.deduped.push({ i, message: messageData });
return acc;
}, initialResult);
earliestTimestamp = earliest;
latestTimestamp = latest;
dedupedMessages.push(...deduped);
}
const tags: { [key: string]: string } = {
...(source ? { source } : {}),
...(peerId ? { peer_id: peerId.toString() } : {}),
};

statsd().gauge("hub.submit_message_bundle.size", dedupedMessages.length, tags);
statsd().gauge(
"hub.submit_message_bundle.earliest_timestamp_ms",
fromFarcasterTime(earliestTimestamp).unwrapOr(0),
tags,
);
statsd().gauge(
"hub.submit_message_bundle.latest_timestamp_ms",
fromFarcasterTime(latestTimestamp).unwrapOr(0),
tags,
);
statsd().gauge(
"hub.submit_message_bundle.creation_time_ms",
fromFarcasterTime(creationFarcasterTime).unwrapOr(0),
tags,
);
statsd().gauge("hub.submit_message_bundle.max_delay_ms", creationFarcasterTime - earliestTimestamp, tags);

// Merge the messages
const mergeResults = await this.engine.mergeMessages(dedupedMessages.map((m) => m.message));
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { sleep } from "../../utils/crypto.js";
/** The maximum number of pending merge messages before we drop new incoming gossip or sync messages. */
export const MAX_SYNCTRIE_QUEUE_SIZE = 100_000;
/** The TTL for messages in the seen cache */
export const GOSSIP_SEEN_TTL = 1000 * 60 * 5;
export const GOSSIP_SEEN_TTL = 1000 * 60 * 5; // 5 minutes

/** The maximum amount of time to dial a peer in libp2p network in milliseconds */
export const LIBP2P_CONNECT_TIMEOUT_MS = 2000;
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
statsd().gauge("syncengine.merge_q", this._syncMergeQ);

const startTime = Date.now();
const results = await this._hub.submitMessageBundle(MessageBundle.create({ messages }), "sync");
const results = await this._hub.submitMessageBundle(startTime, MessageBundle.create({ messages }), "sync");

for (let i = 0; i < results.length; i++) {
const result = results[i] as HubResult<number>;
Expand Down
6 changes: 3 additions & 3 deletions apps/hubble/src/test/e2e/hubbleNetwork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ describe("hubble gossip and sync tests", () => {
messages: castAddMessages,
});
// Submit it to hub1 via rpc so it gets gossiped
const bundleMergeResult = await hub1.submitMessageBundle(messageBundle, "rpc");
const bundleMergeResult = await hub1.submitMessageBundle(Date.now(), messageBundle, "rpc");
expect(bundleMergeResult.length).toEqual(5);
for (let i = 0; i < 5; i++) {
expect(bundleMergeResult[i]?.isOk()).toBeTruthy();
Expand All @@ -167,7 +167,7 @@ describe("hubble gossip and sync tests", () => {
}

// Submitting the same bundle again should result in a duplicate error if we submit via gossip
const errResult2 = await hub1.submitMessageBundle(messageBundle, "gossip");
const errResult2 = await hub1.submitMessageBundle(Date.now(), messageBundle, "gossip");
// Expect all the messages to be duplicates
for (let i = 0; i < 5; i++) {
expect(errResult2[i]?.isErr()).toBeTruthy();
Expand All @@ -193,7 +193,7 @@ describe("hubble gossip and sync tests", () => {
hash: new Uint8Array([0, 1, 2, 1, 2]),
messages: castAddMessages2,
});
const errResult3 = await hub1.submitMessageBundle(messageBundle2, "gossip");
const errResult3 = await hub1.submitMessageBundle(Date.now(), messageBundle2, "gossip");
expect(errResult3.length).toEqual(5);
expect(errResult3[0]?.isOk()).toBeTruthy();
expect(errResult3[1]?.isErr()).toBeTruthy();
Expand Down
2 changes: 2 additions & 0 deletions apps/hubble/src/test/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ export class MockHub implements HubInterface {
}

async submitMessageBundle(
_creationTimeMs: number,
messageBundle: MessageBundle,
source?: HubSubmitSource | undefined,
_peerId?: PeerId,
): Promise<HubResult<number>[]> {
const results: HubResult<number>[] = [];
for (const message of messageBundle.messages) {
Expand Down

0 comments on commit fdcc3b5

Please sign in to comment.