Skip to content

Commit

Permalink
feat: add unique peer map to sync engine to represent current active …
Browse files Browse the repository at this point in the history
…peers (farcasterxyz#2120)

## Motivation

- Current peer store keeps track of all hubs that successfully connected
- However, peers may connect and never be seen again due to churn or
change in Peer ID
- We add TTL Map of peers that only adds peers, never deletes, and
expires any peer that hasn't gossiped updates in 24 hours

## Change Summary

- add unique peer map to sync engine to represent current active peers

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [x] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

## Additional Context

If this is a relatively large or complex change, provide more details
here that will help reviewers

<!-- start pr-codex -->

---

## PR-Codex overview
This PR introduces a unique peer map to track active peers in the sync
engine of the Hubble app. It also adds a high-performance TTLMap for
efficient entry expiration management.

### Detailed summary
- Added unique peer map for active peers in the sync engine
- Implemented TTLMap for automatic expiration of entries
- Updated handling of peer discovery events
- Enhanced peer contact management in the sync engine

> The following files were skipped due to too many changes:
`apps/hubble/src/utils/ttl_map.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
Wazzymandias authored Jul 3, 2024
1 parent 094fe86 commit 2a82b3d
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/popular-shoes-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: add unique peer map to sync engine to represent current active peers
23 changes: 23 additions & 0 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,7 @@ export class Hub implements HubInterface {

await this.gossipNode.gossipContactInfo(contactInfo);
statsd().gauge("peer_store.count", await this.gossipNode.peerStoreCount());
statsd().gauge("active_peers.count", this.syncEngine.getActivePeerCount());
return Promise.resolve(ok(undefined));
}
}
Expand Down Expand Up @@ -1685,6 +1686,28 @@ export class Hub implements HubInterface {
this.syncEngine.removeContactInfoForPeerId(connection.remotePeer.toString());
statsd().increment("peer_disconnect.count");
});

this.gossipNode.on("peerDiscovery", async (peerInfo) => {
// NB: The current code is not in use - we would require a source to emit peerDiscovery events.
// This is a placeholder for future use.

// Add discovered peer to sync engine
const peerId = peerInfo.id;
const peerAddresses = peerInfo.multiaddrs;

// sorts addresses by Public IPs first
const addr = peerAddresses.sort((a, b) =>
publicAddressesFirst({ multiaddr: a, isCertified: false }, { multiaddr: b, isCertified: false }),
)[0];
if (addr === undefined) {
log.info(
{ function: "peerDiscovery", peerId: peerId.toString() },
"peer found but no address is available to request sync",
);
return;
}
await this.gossipNode.addPeerToAddressBook(peerId, addr);
});
}

/* -------------------------------------------------------------------------- */
Expand Down
3 changes: 3 additions & 0 deletions apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { PublishResult } from "@libp2p/interface-pubsub";
import { Worker } from "worker_threads";
import { PeerInfo } from "@libp2p/interface-peer-info";
import {
ContactInfoContent,
FarcasterNetwork,
Expand Down Expand Up @@ -50,6 +51,8 @@ interface NodeEvents {
peerConnect: (connection: Connection) => void;
/** Triggers when a peer disconnects and includes the libp2p Connection object */
peerDisconnect: (connection: Connection) => void;
/** Triggers when a peer is discovered and includes the libp2p PeerInfo object */
peerDiscovery: (peerInfo: PeerInfo) => void;
}

/** Optional arguments provided when creating a Farcaster Gossip Node */
Expand Down
40 changes: 37 additions & 3 deletions apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ import { PeerScore, PeerScorer } from "./peerScore.js";
import { getOnChainEvent } from "../../storage/db/onChainEvent.js";
import { getUserNameProof } from "../../storage/db/nameRegistryEvent.js";
import { MaxPriorityQueue } from "@datastructures-js/priority-queue";
import { TTLMap } from "../../utils/ttl_map.js";
import * as buffer from "node:buffer";
import { peerIdFromString } from "@libp2p/peer-id";

// Time to live for peer contact info in the Peer TTLMap
const PEER_TTL_MAP_EXPIRATION_TIME_MILLISECONDS = 1000 * 60 * 60 * 24; // 24 hours
// Time interval to run cleanup on the Peer TTLMap
const PEER_TTL_MAP_CLEANUP_INTERVAL_MILLISECONDS = 1000 * 60 * 60 * 36; // 36 hours

// Number of seconds to wait for the network to "settle" before syncing. We will only
// attempt to sync messages that are older than this time.
Expand Down Expand Up @@ -246,6 +254,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
private _syncProfiler?: SyncEngineProfiler;

private currentHubPeerContacts: Map<string, PeerContact> = new Map();
private uniquePeerMap: TTLMap<string, ContactInfoContentBody>;

// Number of messages waiting to get into the SyncTrie.
private _syncTrieQ = 0;
Expand Down Expand Up @@ -282,6 +291,10 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
) {
super();

this.uniquePeerMap = new TTLMap<string, ContactInfoContentBody>(
PEER_TTL_MAP_EXPIRATION_TIME_MILLISECONDS,
PEER_TTL_MAP_CLEANUP_INTERVAL_MILLISECONDS,
);
this._db = rocksDb;
this._trie = new MerkleTrie(rocksDb);
this._l2EventsProvider = l2EventsProvider;
Expand Down Expand Up @@ -541,12 +554,17 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
return this.currentHubPeerContacts.size;
}

public getActivePeerCount(): number {
return this.uniquePeerMap.size();
}

public getContactInfoForPeerId(peerId: string): PeerContact | undefined {
return this.currentHubPeerContacts.get(peerId);
}

public getCurrentHubPeerContacts() {
return this.currentHubPeerContacts.values();
// return non-expired active peers
return this.uniquePeerMap.getAll();
}

public addContactInfoForPeerId(
Expand Down Expand Up @@ -577,14 +595,21 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
},
"Updated Peer ContactInfo",
);
this.currentHubPeerContacts.set(peerId.toString(), { peerId, contactInfo });
const id = peerId.toString();
this.uniquePeerMap.set(id, contactInfo);
this.currentHubPeerContacts.set(id, { peerId, contactInfo });
return ok(undefined);
} else {
return err(new HubError("bad_request.duplicate", "recent contact update found for peer"));
}
}

public removeContactInfoForPeerId(peerId: string) {
// NB: We don't remove the peer from the uniquePeerMap here - there can be many (valid) reasons to remove a
// peer from current hub peer contacts that are due to peer behavior but not indicative of whether that peer is active.
// For example, the peer may have a bad peer score or be disallowed for this hub, but it may still be an active peer.
// Since the unique peer map is meant to return the set of all active non-expired peers, we only remove
// peers from it when their TTL expires (i.e, if we don't get updates to their contact info for 24 hours)
this.currentHubPeerContacts.delete(peerId);
}

Expand Down Expand Up @@ -636,7 +661,16 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
// Use a buffer of 5% of our messages so the peer with the highest message count does not get picked
// disproportionately
const messageThreshold = snapshotResult.value.numMessages * 0.95;
peers = Array.from(this.currentHubPeerContacts.values()).filter((p) => p.contactInfo.count > messageThreshold);
peers = this.uniquePeerMap
.getAll()
.filter((p) => this.currentHubPeerContacts.has(p[0]) && p[1].count > messageThreshold)
.map(
(p) =>
({
peerId: peerIdFromString(p[0]),
contactInfo: p[1],
}) as PeerContact,
);
}

if (peers.length === 0) {
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ export default class Server {
return;
}

const contactInfoArray = Array.from(currentHubPeerContacts).map((peerContact) => peerContact.contactInfo);
const contactInfoArray = Array.from(currentHubPeerContacts).map((peerContact) => peerContact[1]);
callback(null, ContactInfoResponse.create({ contacts: contactInfoArray }));
})();
},
Expand Down
142 changes: 142 additions & 0 deletions apps/hubble/src/utils/ttl_map.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { TTLMap } from "./ttl_map.js";
import { jest } from "@jest/globals";

describe("TTLMap", () => {
jest.useFakeTimers();

test("size should accurately reflect non-expired entries", () => {
const map = new TTLMap<string, number>(1000);

expect(map.size()).toBe(0);

map.set("a", 1);
expect(map.size()).toBe(1);

map.set("b", 2);
expect(map.size()).toBe(2);

jest.advanceTimersByTime(500);
map.set("c", 3);
expect(map.size()).toBe(3);

jest.advanceTimersByTime(501);
expect(map.size()).toBe(1);

map.get("a"); // This should trigger cleanup of expired entry
expect(map.size()).toBe(1);
});

test("set should not double count when updating existing entries", () => {
const map = new TTLMap<string, number>(1000);

map.set("a", 1);
expect(map.size()).toBe(1);

map.set("a", 2); // Updating existing non-expired entry
expect(map.size()).toBe(1);

jest.advanceTimersByTime(1001);
map.set("a", 3); // Updating expired entry
expect(map.size()).toBe(1);
});

test("delete should correctly update size for expired and non-expired entries", () => {
const map = new TTLMap<string, number>(1000);

map.set("a", 1);
map.set("b", 2);
expect(map.size()).toBe(2);

map.delete("a");
expect(map.size()).toBe(1);

jest.advanceTimersByTime(1001);
expect(map.delete("b")).toBe(true); // Deleting expired entry
expect(map.size()).toBe(0);
});

test("get should update size when retrieving expired entries", () => {
const map = new TTLMap<string, number>(1000);

map.set("a", 1);
expect(map.size()).toBe(1);

jest.advanceTimersByTime(1001);
expect(map.get("a")).toBeUndefined();
expect(map.size()).toBe(0);
});

test("resetTTL should correctly handle expired and non-expired entries", () => {
const map = new TTLMap<string, number>(1000);

map.set("a", 1);
expect(map.size()).toBe(1);

jest.advanceTimersByTime(500);
map.resetTTL("a");
expect(map.size()).toBe(1);

jest.advanceTimersByTime(750);
expect(map.get("a")).toBe(1);
expect(map.size()).toBe(1);

jest.advanceTimersByTime(251);
map.resetTTL("a");
expect(map.size()).toBe(1);
expect(map.get("a")).toBe(1);
});

test("getAll should return only non-expired entries", () => {
const map = new TTLMap<string, number>(1000);

map.set("a", 1);
map.set("b", 2);
map.set("c", 3);

jest.advanceTimersByTime(500);
map.set("d", 4);

jest.advanceTimersByTime(501);

const allEntries = map.getAll();
expect(allEntries).toHaveLength(1);
expect(allEntries[0]).toEqual(["d", 4]);
expect(map.size()).toBe(1);
});

test("clear should reset size to zero", () => {
const map = new TTLMap<string, number>(1000);

map.set("a", 1);
map.set("b", 2);
expect(map.size()).toBe(2);

map.clear();
expect(map.size()).toBe(0);

map.set("c", 3);
expect(map.size()).toBe(1);
});

test("size should be accurate after multiple operations", () => {
const map = new TTLMap<string, number>(1000);

map.set("a", 1);
map.set("b", 2);
map.set("c", 3);
expect(map.size()).toBe(3);

jest.advanceTimersByTime(500);
map.delete("b");
map.set("d", 4);
expect(map.size()).toBe(3);

jest.advanceTimersByTime(501);
map.get("a"); // This should trigger cleanup
expect(map.size()).toBe(1);

map.set("e", 5);
map.resetTTL("d");
expect(map.size()).toBe(2);
});
});
Loading

0 comments on commit 2a82b3d

Please sign in to comment.