diff --git a/.changeset/popular-shoes-bathe.md b/.changeset/popular-shoes-bathe.md new file mode 100644 index 0000000000..af1864f2aa --- /dev/null +++ b/.changeset/popular-shoes-bathe.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +feat: add unique peer map to sync engine to represent current active peers diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 2072dcc5ba..7be20f7c0c 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -1260,6 +1260,7 @@ export class Hub implements HubInterface { await this.gossipNode.gossipContactInfo(contactInfo); statsd().gauge("peer_store.count", await this.gossipNode.peerStoreCount()); + statsd().gauge("active_peers.count", this.syncEngine.getActivePeerCount()); return Promise.resolve(ok(undefined)); } } @@ -1685,6 +1686,28 @@ export class Hub implements HubInterface { this.syncEngine.removeContactInfoForPeerId(connection.remotePeer.toString()); statsd().increment("peer_disconnect.count"); }); + + this.gossipNode.on("peerDiscovery", async (peerInfo) => { + // NB: The current code is not in use - we would require a source to emit peerDiscovery events. + // This is a placeholder for future use. + + // Add discovered peer to sync engine + const peerId = peerInfo.id; + const peerAddresses = peerInfo.multiaddrs; + + // sorts addresses by Public IPs first + const addr = peerAddresses.sort((a, b) => + publicAddressesFirst({ multiaddr: a, isCertified: false }, { multiaddr: b, isCertified: false }), + )[0]; + if (addr === undefined) { + log.info( + { function: "peerDiscovery", peerId: peerId.toString() }, + "peer found but no address is available to request sync", + ); + return; + } + await this.gossipNode.addPeerToAddressBook(peerId, addr); + }); } /* -------------------------------------------------------------------------- */ diff --git a/apps/hubble/src/network/p2p/gossipNode.ts b/apps/hubble/src/network/p2p/gossipNode.ts index 029d6a5545..df3b9b43a0 100644 --- a/apps/hubble/src/network/p2p/gossipNode.ts +++ b/apps/hubble/src/network/p2p/gossipNode.ts @@ -1,5 +1,6 @@ import { PublishResult } from "@libp2p/interface-pubsub"; import { Worker } from "worker_threads"; +import { PeerInfo } from "@libp2p/interface-peer-info"; import { ContactInfoContent, FarcasterNetwork, @@ -50,6 +51,8 @@ interface NodeEvents { peerConnect: (connection: Connection) => void; /** Triggers when a peer disconnects and includes the libp2p Connection object */ peerDisconnect: (connection: Connection) => void; + /** Triggers when a peer is discovered and includes the libp2p PeerInfo object */ + peerDiscovery: (peerInfo: PeerInfo) => void; } /** Optional arguments provided when creating a Farcaster Gossip Node */ diff --git a/apps/hubble/src/network/sync/syncEngine.ts b/apps/hubble/src/network/sync/syncEngine.ts index 8c54f062e4..be879b0d76 100644 --- a/apps/hubble/src/network/sync/syncEngine.ts +++ b/apps/hubble/src/network/sync/syncEngine.ts @@ -52,6 +52,14 @@ import { PeerScore, PeerScorer } from "./peerScore.js"; import { getOnChainEvent } from "../../storage/db/onChainEvent.js"; import { getUserNameProof } from "../../storage/db/nameRegistryEvent.js"; import { MaxPriorityQueue } from "@datastructures-js/priority-queue"; +import { TTLMap } from "../../utils/ttl_map.js"; +import * as buffer from "node:buffer"; +import { peerIdFromString } from "@libp2p/peer-id"; + +// Time to live for peer contact info in the Peer TTLMap +const PEER_TTL_MAP_EXPIRATION_TIME_MILLISECONDS = 1000 * 60 * 60 * 24; // 24 hours +// Time interval to run cleanup on the Peer TTLMap +const PEER_TTL_MAP_CLEANUP_INTERVAL_MILLISECONDS = 1000 * 60 * 60 * 36; // 36 hours // Number of seconds to wait for the network to "settle" before syncing. We will only // attempt to sync messages that are older than this time. @@ -246,6 +254,7 @@ class SyncEngine extends TypedEmitter { private _syncProfiler?: SyncEngineProfiler; private currentHubPeerContacts: Map = new Map(); + private uniquePeerMap: TTLMap; // Number of messages waiting to get into the SyncTrie. private _syncTrieQ = 0; @@ -282,6 +291,10 @@ class SyncEngine extends TypedEmitter { ) { super(); + this.uniquePeerMap = new TTLMap( + PEER_TTL_MAP_EXPIRATION_TIME_MILLISECONDS, + PEER_TTL_MAP_CLEANUP_INTERVAL_MILLISECONDS, + ); this._db = rocksDb; this._trie = new MerkleTrie(rocksDb); this._l2EventsProvider = l2EventsProvider; @@ -541,12 +554,17 @@ class SyncEngine extends TypedEmitter { return this.currentHubPeerContacts.size; } + public getActivePeerCount(): number { + return this.uniquePeerMap.size(); + } + public getContactInfoForPeerId(peerId: string): PeerContact | undefined { return this.currentHubPeerContacts.get(peerId); } public getCurrentHubPeerContacts() { - return this.currentHubPeerContacts.values(); + // return non-expired active peers + return this.uniquePeerMap.getAll(); } public addContactInfoForPeerId( @@ -577,7 +595,9 @@ class SyncEngine extends TypedEmitter { }, "Updated Peer ContactInfo", ); - this.currentHubPeerContacts.set(peerId.toString(), { peerId, contactInfo }); + const id = peerId.toString(); + this.uniquePeerMap.set(id, contactInfo); + this.currentHubPeerContacts.set(id, { peerId, contactInfo }); return ok(undefined); } else { return err(new HubError("bad_request.duplicate", "recent contact update found for peer")); @@ -585,6 +605,11 @@ class SyncEngine extends TypedEmitter { } public removeContactInfoForPeerId(peerId: string) { + // NB: We don't remove the peer from the uniquePeerMap here - there can be many (valid) reasons to remove a + // peer from current hub peer contacts that are due to peer behavior but not indicative of whether that peer is active. + // For example, the peer may have a bad peer score or be disallowed for this hub, but it may still be an active peer. + // Since the unique peer map is meant to return the set of all active non-expired peers, we only remove + // peers from it when their TTL expires (i.e, if we don't get updates to their contact info for 24 hours) this.currentHubPeerContacts.delete(peerId); } @@ -636,7 +661,16 @@ class SyncEngine extends TypedEmitter { // Use a buffer of 5% of our messages so the peer with the highest message count does not get picked // disproportionately const messageThreshold = snapshotResult.value.numMessages * 0.95; - peers = Array.from(this.currentHubPeerContacts.values()).filter((p) => p.contactInfo.count > messageThreshold); + peers = this.uniquePeerMap + .getAll() + .filter((p) => this.currentHubPeerContacts.has(p[0]) && p[1].count > messageThreshold) + .map( + (p) => + ({ + peerId: peerIdFromString(p[0]), + contactInfo: p[1], + }) as PeerContact, + ); } if (peers.length === 0) { diff --git a/apps/hubble/src/rpc/server.ts b/apps/hubble/src/rpc/server.ts index 83ff5134b7..5cd0c5c8f0 100644 --- a/apps/hubble/src/rpc/server.ts +++ b/apps/hubble/src/rpc/server.ts @@ -495,7 +495,7 @@ export default class Server { return; } - const contactInfoArray = Array.from(currentHubPeerContacts).map((peerContact) => peerContact.contactInfo); + const contactInfoArray = Array.from(currentHubPeerContacts).map((peerContact) => peerContact[1]); callback(null, ContactInfoResponse.create({ contacts: contactInfoArray })); })(); }, diff --git a/apps/hubble/src/utils/ttl_map.test.ts b/apps/hubble/src/utils/ttl_map.test.ts new file mode 100644 index 0000000000..b6e0107dff --- /dev/null +++ b/apps/hubble/src/utils/ttl_map.test.ts @@ -0,0 +1,142 @@ +import { TTLMap } from "./ttl_map.js"; +import { jest } from "@jest/globals"; + +describe("TTLMap", () => { + jest.useFakeTimers(); + + test("size should accurately reflect non-expired entries", () => { + const map = new TTLMap(1000); + + expect(map.size()).toBe(0); + + map.set("a", 1); + expect(map.size()).toBe(1); + + map.set("b", 2); + expect(map.size()).toBe(2); + + jest.advanceTimersByTime(500); + map.set("c", 3); + expect(map.size()).toBe(3); + + jest.advanceTimersByTime(501); + expect(map.size()).toBe(1); + + map.get("a"); // This should trigger cleanup of expired entry + expect(map.size()).toBe(1); + }); + + test("set should not double count when updating existing entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + expect(map.size()).toBe(1); + + map.set("a", 2); // Updating existing non-expired entry + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(1001); + map.set("a", 3); // Updating expired entry + expect(map.size()).toBe(1); + }); + + test("delete should correctly update size for expired and non-expired entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + map.set("b", 2); + expect(map.size()).toBe(2); + + map.delete("a"); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(1001); + expect(map.delete("b")).toBe(true); // Deleting expired entry + expect(map.size()).toBe(0); + }); + + test("get should update size when retrieving expired entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(1001); + expect(map.get("a")).toBeUndefined(); + expect(map.size()).toBe(0); + }); + + test("resetTTL should correctly handle expired and non-expired entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(500); + map.resetTTL("a"); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(750); + expect(map.get("a")).toBe(1); + expect(map.size()).toBe(1); + + jest.advanceTimersByTime(251); + map.resetTTL("a"); + expect(map.size()).toBe(1); + expect(map.get("a")).toBe(1); + }); + + test("getAll should return only non-expired entries", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + map.set("b", 2); + map.set("c", 3); + + jest.advanceTimersByTime(500); + map.set("d", 4); + + jest.advanceTimersByTime(501); + + const allEntries = map.getAll(); + expect(allEntries).toHaveLength(1); + expect(allEntries[0]).toEqual(["d", 4]); + expect(map.size()).toBe(1); + }); + + test("clear should reset size to zero", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + map.set("b", 2); + expect(map.size()).toBe(2); + + map.clear(); + expect(map.size()).toBe(0); + + map.set("c", 3); + expect(map.size()).toBe(1); + }); + + test("size should be accurate after multiple operations", () => { + const map = new TTLMap(1000); + + map.set("a", 1); + map.set("b", 2); + map.set("c", 3); + expect(map.size()).toBe(3); + + jest.advanceTimersByTime(500); + map.delete("b"); + map.set("d", 4); + expect(map.size()).toBe(3); + + jest.advanceTimersByTime(501); + map.get("a"); // This should trigger cleanup + expect(map.size()).toBe(1); + + map.set("e", 5); + map.resetTTL("d"); + expect(map.size()).toBe(2); + }); +}); diff --git a/apps/hubble/src/utils/ttl_map.ts b/apps/hubble/src/utils/ttl_map.ts new file mode 100644 index 0000000000..c84af24170 --- /dev/null +++ b/apps/hubble/src/utils/ttl_map.ts @@ -0,0 +1,152 @@ +/** + * TTLMap - A high-performance Time-To-Live Map implementation + * + * This data structure provides a Map-like interface with automatic expiration of entries. + * It is designed for scenarios with thousands of elements and frequent bulk retrieval operations. + * + * Key features: + * - Constant-time complexity for get, set, and delete operations + * - Efficient bulk retrieval of non-expired entries + * - Coarse-grained TTL defined at the Map level + * - Ability to reset TTL for individual entries + * + * @template K The type of keys in the map + * @template V The type of values in the map + */ +export class TTLMap { + private map: Map; + private readonly ttl: number; + private lastCleanup: number; + private readonly cleanupInterval: number; + private nonExpiredCount: number; + + /** + * @param ttl Time-to-live in milliseconds for entries + * @param cleanupInterval Optional interval for running the cleanup process (default: ttl / 2) + */ + constructor(ttl: number, cleanupInterval?: number) { + this.map = new Map(); + this.ttl = ttl; + this.lastCleanup = Date.now(); + this.cleanupInterval = cleanupInterval || Math.floor(ttl / 2); + this.nonExpiredCount = 0; + } + + /** + * Sets a key-value pair in the map with the current TTL + * @param key The key to set + * @param value The value to set + * @returns The TTLMap instance for method chaining + */ + set(key: K, value: V): this { + const now = Date.now(); + const expiresAt = now + this.ttl; + const existingEntry = this.map.get(key); + + if (!existingEntry) { + this.nonExpiredCount++; + } + + this.map.set(key, { value, expiresAt }); + this.cleanupIfNeeded(); + return this; + } + + /** + * Retrieves a value from the map if it exists and hasn't expired + * @param key The key to retrieve + * @returns The value if found and not expired, undefined otherwise + */ + get(key: K): V | undefined { + const entry = this.map.get(key); + if (entry) { + if (entry.expiresAt > Date.now()) { + return entry.value; + } else { + this.map.delete(key); + this.nonExpiredCount--; + } + } + return undefined; + } + + /** + * Deletes a key-value pair from the map + * @param key The key to delete + * @returns true if the element was in the map and not expired, false otherwise + */ + delete(key: K): boolean { + const entry = this.map.get(key); + if (entry) { + if (entry.expiresAt > Date.now()) { + this.nonExpiredCount--; + } + return this.map.delete(key); + } + return false; + } + + /** + * Resets the TTL for a given key without changing its value + * @param key The key to reset the TTL for + * @returns true if the key exists and TTL was reset, false otherwise + */ + resetTTL(key: K): boolean { + const entry = this.map.get(key); + if (entry) { + entry.expiresAt = Date.now() + this.ttl; + return true; + } + return false; + } + + /** + * Retrieves all non-expired entries from the map + * This method is optimized for frequent calls and large datasets + * @returns An array of [key, value] pairs for all non-expired entries + */ + getAll(): [K, V][] { + this.cleanupIfNeeded(); + const now = Date.now(); + return Array.from(this.map.entries()) + .filter(([, { expiresAt }]) => expiresAt > now) + .map(([key, { value }]) => [key, value]); + } + + /** + * Clears all entries from the map + */ + clear(): void { + this.nonExpiredCount = 0; + this.map.clear(); + } + + /** + * Returns the number of non-expired entries in the map + * @returns The number of non-expired entries in the map + */ + size(): number { + this.cleanupIfNeeded(); + return this.nonExpiredCount; + } + + /** + * Performs cleanup of expired entries if the cleanup interval has elapsed + * This method is called internally and doesn't need to be invoked manually + */ + private cleanupIfNeeded(): void { + const now = Date.now(); + if (now - this.lastCleanup > this.cleanupInterval) { + let nonExpired = 0; + this.map.forEach((entry, key) => { + if (entry.expiresAt <= now) { + this.map.delete(key); + } else { + nonExpired++; + } + }); + this.lastCleanup = now; + this.nonExpiredCount = nonExpired; + } + } +}