Skip to content

Commit

Permalink
[feat] Support fname and onChainEvent SyncIds (farcasterxyz#1414)
Browse files Browse the repository at this point in the history
* feat: Support fname and onChainEvent SyncIds

* changeset
  • Loading branch information
sanjayprabhu authored Sep 22, 2023
1 parent dd4aec3 commit 4b99edd
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 86 deletions.
6 changes: 6 additions & 0 deletions .changeset/silly-beers-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@farcaster/core": patch
"@farcaster/hubble": patch
---

feat: support fname and onchain event syncids
2 changes: 1 addition & 1 deletion apps/hubble/src/network/sync/merkleTrie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class MerkleTrie {
(e) => e as HubError,
)();
if (message.isOk() && message.value.hash.length === HASH_LENGTH) {
await this.insert(new SyncId(message.value));
await this.insert(SyncId.fromMessage(message.value));
count += 1;
if (count % 10_000 === 0) {
log.info({ count }, "Rebuilding Merkle Trie");
Expand Down
5 changes: 4 additions & 1 deletion apps/hubble/src/network/sync/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { HubResult, MessagesResponse, SyncIds, TrieNodeMetadataResponse, TrieNod
import Engine from "../../storage/engine/index.js";
import { NodeMetadata } from "./merkleTrie.js";
import SyncEngine from "./syncEngine.js";
import { SyncId } from "./syncId.js";

export class MockRpcClient {
engine: Engine;
Expand Down Expand Up @@ -71,7 +72,9 @@ export class MockRpcClient {

async getAllMessagesBySyncIds(request: SyncIds): Promise<HubResult<MessagesResponse>> {
this.getAllMessagesBySyncIdsCalls.push(request);
const messagesResult = await this.syncEngine.getAllMessagesBySyncIds(request.syncIds);
const messagesResult = await this.syncEngine.getAllMessagesBySyncIds(
request.syncIds.map((s) => SyncId.fromBytes(s)),
);
return messagesResult.map((messages) => {
this.getAllMessagesBySyncIdsReturns += messages.length;
return MessagesResponse.create({ messages: messages ?? [] });
Expand Down
36 changes: 21 additions & 15 deletions apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ describe("Multi peer sync engine", () => {
expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine1.trie.rootHash());

// Make sure the castAdd is in the trie
expect(await syncEngine1.trie.exists(new SyncId(castAdd))).toBeTruthy();
expect(await syncEngine2.trie.exists(new SyncId(castAdd))).toBeTruthy();
expect(await syncEngine1.trie.exists(SyncId.fromMessage(castAdd))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))).toBeTruthy();

const castRemove = await Factories.CastRemoveMessage.create(
{
Expand All @@ -268,10 +268,10 @@ describe("Multi peer sync engine", () => {

expect(result.isOk()).toBeTruthy();

const castRemoveId = new SyncId(castRemove);
const castRemoveId = SyncId.fromMessage(castRemove);
expect(await syncEngine1.trie.exists(castRemoveId)).toBeTruthy();
// The trie should not contain the castAdd anymore
expect(await syncEngine1.trie.exists(new SyncId(castAdd))).toBeFalsy();
expect(await syncEngine1.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy();

// Syncing engine2 --> engine1 should do nothing, even though engine2 has the castAdd and it has been removed
// from engine1.
Expand All @@ -297,7 +297,7 @@ describe("Multi peer sync engine", () => {
await sleepWhile(() => syncEngine2.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);

expect(await syncEngine2.trie.exists(castRemoveId)).toBeTruthy();
expect(await syncEngine2.trie.exists(new SyncId(castAdd))).toBeFalsy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy();

expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine1.trie.rootHash());

Expand Down Expand Up @@ -364,7 +364,7 @@ describe("Multi peer sync engine", () => {
await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1);

expect(fetchMessagesSpy).toHaveBeenCalledTimes(1);
expect(fetchMessagesSpy).toHaveBeenCalledWith([Buffer.from(new SyncId(msgs[0] as Message).syncId())]);
expect(fetchMessagesSpy).toHaveBeenCalledWith([SyncId.fromMessage(msgs[0] as Message)]);

// Also assert the root hashes are the same
expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine1.trie.rootHash());
Expand Down Expand Up @@ -447,15 +447,18 @@ describe("Multi peer sync engine", () => {
await syncEngine2.start();

// Add a message to engine1 synctrie, but not to the engine itself.
syncEngine1.trie.insert(new SyncId(castAdd));
syncEngine1.trie.insert(SyncId.fromMessage(castAdd));

// Attempt to sync engine2 <-- engine1.
await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1);

// Since the message is actually missing, it should be a no-op, and the missing message should disappear
// from the sync trie
await sleepWhile(async () => (await syncEngine2.trie.exists(new SyncId(castAdd))) === true, SLEEPWHILE_TIMEOUT);
expect(await syncEngine2.trie.exists(new SyncId(castAdd))).toBeFalsy();
await sleepWhile(
async () => (await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))) === true,
SLEEPWHILE_TIMEOUT,
);
expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy();

// The root hashes should be the same, since nothing actually happened
expect(await syncEngine1.trie.items()).toEqual(await syncEngine2.trie.items());
Expand Down Expand Up @@ -489,7 +492,7 @@ describe("Multi peer sync engine", () => {
expect(await engine1.mergeMessage(castAdd1)).toBeTruthy();

// CastAdd2 is added only to the sync trie, but is missing from the engine
await syncEngine2.trie.insert(new SyncId(castAdd2));
await syncEngine2.trie.insert(SyncId.fromMessage(castAdd2));

// Wait for the sync trie to be updated
await sleepWhile(async () => (await syncEngine2.trie.items()) !== 2, SLEEPWHILE_TIMEOUT);
Expand All @@ -499,12 +502,12 @@ describe("Multi peer sync engine", () => {
await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1);

// The sync engine should realize that castAdd2 is not in it's engine, so it should be removed from the sync trie
await sleepWhile(async () => (await syncEngine2.trie.exists(new SyncId(castAdd2))) === true, 1000);
await sleepWhile(async () => (await syncEngine2.trie.exists(SyncId.fromMessage(castAdd2))) === true, 1000);

expect(await syncEngine2.trie.exists(new SyncId(castAdd2))).toBeFalsy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd2))).toBeFalsy();

// but the castAdd1 should still be there
expect(await syncEngine2.trie.exists(new SyncId(castAdd1))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd1))).toBeTruthy();

await syncEngine2.stop();
await engine2.stop();
Expand All @@ -529,7 +532,7 @@ describe("Multi peer sync engine", () => {
await engine2.mergeMessage(castAdd);

// ...but we'll corrupt the sync trie by pretending that the signerAdd message is missing
syncEngine2.trie.deleteBySyncId(new SyncId(castAdd));
syncEngine2.trie.deleteBySyncId(SyncId.fromMessage(castAdd));

// syncengine2 should be empty
expect(await syncEngine2.trie.items()).toEqual(0);
Expand All @@ -542,7 +545,10 @@ describe("Multi peer sync engine", () => {

// Since the message isn't actually missing, it should be a no-op, and the missing message should
// get added back to the sync trie
await sleepWhile(async () => (await syncEngine2.trie.exists(new SyncId(castAdd))) === false, SLEEPWHILE_TIMEOUT);
await sleepWhile(
async () => (await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))) === false,
SLEEPWHILE_TIMEOUT,
);

// The root hashes should now be the same
expect(await syncEngine1.trie.items()).toEqual(await syncEngine2.trie.items());
Expand Down
26 changes: 13 additions & 13 deletions apps/hubble/src/network/sync/syncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ describe("SyncEngine", () => {

// Two messages (signerEvent + castAdd) was added to the trie
expect((await syncEngine.trie.items()) - existingItems).toEqual(1);
expect(await syncEngine.trie.exists(new SyncId(castAdd))).toBeTruthy();
expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeTruthy();
});

test("trie is not updated on merge failure", async () => {
Expand All @@ -125,7 +125,7 @@ describe("SyncEngine", () => {
await sleepWhile(() => syncEngine.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);

expect(await syncEngine.trie.items()).toEqual(0);
expect(await syncEngine.trie.exists(new SyncId(castAdd))).toBeFalsy();
expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy();
});

test("trie is updated when a message is removed", async () => {
Expand All @@ -150,18 +150,18 @@ describe("SyncEngine", () => {
// Wait for the trie to be updated
await sleepWhile(() => syncEngine.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);

const id = new SyncId(castRemove);
const id = SyncId.fromMessage(castRemove);
expect(await syncEngine.trie.exists(id)).toBeTruthy();

const allMessages = await syncEngine.getAllMessagesBySyncIds([id.syncId()]);
const allMessages = await syncEngine.getAllMessagesBySyncIds([id]);
expect(allMessages.isOk()).toBeTruthy();
expect(allMessages._unsafeUnwrap()[0]?.data?.type).toEqual(MessageType.CAST_REMOVE);

// The trie should contain the message remove
expect(await syncEngine.trie.exists(id)).toBeTruthy();

// The trie should not contain the castAdd anymore
expect(await syncEngine.trie.exists(new SyncId(castAdd))).toBeFalsy();
expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy();
});

test("trie is updated for username proof messages", async () => {
Expand Down Expand Up @@ -200,12 +200,12 @@ describe("SyncEngine", () => {

// SignerAdd and Username proof is added to the trie
expect((await syncEngine.trie.items()) - existingItems).toEqual(1);
expect(await syncEngine.trie.exists(new SyncId(proof))).toBeTruthy();
expect(await syncEngine.trie.exists(SyncId.fromMessage(proof))).toBeTruthy();
});

test("getAllMessages returns empty with invalid syncId", async () => {
expect(await syncEngine.trie.items()).toEqual(0);
const result = await syncEngine.getAllMessagesBySyncIds([new SyncId(castAdd).syncId()]);
const result = await syncEngine.getAllMessagesBySyncIds([SyncId.fromMessage(castAdd)]);
expect(result.isOk()).toBeTruthy();
expect(result._unsafeUnwrap()[0]?.data).toBeUndefined();
expect(result._unsafeUnwrap()[0]?.hash.length).toEqual(0);
Expand Down Expand Up @@ -410,9 +410,9 @@ describe("SyncEngine", () => {
// Make sure all messages exist
expect(await syncEngine2.trie.items()).toEqual(3);
expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine.trie.rootHash());
expect(await syncEngine2.trie.exists(new SyncId(messages[0] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(new SyncId(messages[1] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(new SyncId(messages[2] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[0] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[1] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[2] as Message))).toBeTruthy();

await syncEngine2.stop();
});
Expand All @@ -434,9 +434,9 @@ describe("SyncEngine", () => {
// Make sure all messages exist
expect(await syncEngine2.trie.items()).toEqual(3);
expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine.trie.rootHash());
expect(await syncEngine2.trie.exists(new SyncId(messages[0] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(new SyncId(messages[1] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(new SyncId(messages[2] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[0] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[1] as Message))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[2] as Message))).toBeTruthy();

await syncEngine2.stop();
},
Expand Down
36 changes: 23 additions & 13 deletions apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { err, ok, Result, ResultAsync } from "neverthrow";
import { TypedEmitter } from "tiny-typed-emitter";
import { APP_VERSION, FARCASTER_VERSION, Hub, HubInterface } from "../../hubble.js";
import { MerkleTrie, NodeMetadata } from "./merkleTrie.js";
import { formatPrefix, prefixToTimestamp, SyncId, timestampToPaddedTimestampPrefix } from "./syncId.js";
import { formatPrefix, prefixToTimestamp, SyncId, SyncIdType, timestampToPaddedTimestampPrefix } from "./syncId.js";
import { TrieSnapshot } from "./trieNode.js";
import { getManyMessages } from "../../storage/db/message.js";
import RocksDB from "../../storage/db/rocksdb.js";
Expand Down Expand Up @@ -268,9 +268,9 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}

/** Revoke the individual syncIDs in the Sync Trie */
public async revokeSyncIds(syncIds: Uint8Array[]) {
public async revokeSyncIds(syncIds: SyncId[]) {
for (const syncId of syncIds) {
await this._trie.deleteByBytes(syncId);
await this._trie.deleteByBytes(syncId.syncId());
}
}

Expand Down Expand Up @@ -638,8 +638,14 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
return fullSyncResult;
}

async getAllMessagesBySyncIds(syncIds: Uint8Array[]): HubAsyncResult<Message[]> {
const msgPKs = syncIds.map((syncId) => SyncId.pkFromSyncId(syncId));
async getAllMessagesBySyncIds(syncIds: SyncId[]): HubAsyncResult<Message[]> {
const msgPKs: Buffer[] = [];
for (const syncId of syncIds) {
const unpacked = syncId.unpack();
if (unpacked.type === SyncIdType.Message) {
msgPKs.push(Buffer.from(unpacked.primaryKey));
}
}
return ResultAsync.fromPromise(getManyMessages(this._db, msgPKs), (e) => e as HubError);
}

Expand Down Expand Up @@ -681,7 +687,8 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
// Make sure that the messages are actually for the SyncIDs
const syncIdHashes = new Set(
syncIds
.map((syncId) => bytesToHexString(SyncId.hashFromSyncId(syncId)).unwrapOr(""))
.map((syncId) => SyncId.unpack(syncId))
.map((syncId) => (syncId.type === SyncIdType.Message ? bytesToHexString(syncId.hash).unwrapOr("") : ""))
.filter((str) => str !== ""),
);

Expand Down Expand Up @@ -773,7 +780,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
} else if (result.error.errCode === "bad_request.duplicate") {
// This message has been merged into the DB, but for some reason is not in the Trie.
// Just update the trie.
await this.trie.insert(new SyncId(msg));
await this.trie.insert(SyncId.fromMessage(msg));
mergeResults.push(result);
errCount += 1;
} else {
Expand Down Expand Up @@ -901,10 +908,13 @@ class SyncEngine extends TypedEmitter<SyncEvents> {

if (ourNode?.prefix) {
const suspectSyncIDs = await this.trie.getAllValues(ourNode?.prefix);
const messagesResult = await this.getAllMessagesBySyncIds(suspectSyncIDs);
const messageSyncIds = suspectSyncIDs
.map((s) => SyncId.fromBytes(s))
.filter((syncId) => syncId.unpack().type === SyncIdType.Message);
const messagesResult = await this.getAllMessagesBySyncIds(messageSyncIds);

if (messagesResult.isOk()) {
const corruptedSyncIds = this.findCorruptedSyncIDs(messagesResult.value, suspectSyncIDs);
const corruptedSyncIds = this.findCorruptedSyncIDs(messagesResult.value, messageSyncIds);

if (corruptedSyncIds.length > 0) {
log.warn(
Expand Down Expand Up @@ -971,21 +981,21 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}
}

public findCorruptedSyncIDs(messages: Message[], syncIds: Uint8Array[]): Uint8Array[] {
public findCorruptedSyncIDs(messages: Message[], syncIds: SyncId[]): SyncId[] {
return messages
.map((message, i) => (message.data === undefined || message.hash.length === 0 ? syncIds[i] : undefined))
.filter((i) => i !== undefined) as Uint8Array[];
.filter((i) => i !== undefined) as SyncId[];
}

/** ---------------------------------------------------------------------------------- */
/** Trie Methods */
/** ---------------------------------------------------------------------------------- */
public async addMessage(message: Message): Promise<void> {
await this._trie.insert(new SyncId(message));
await this._trie.insert(SyncId.fromMessage(message));
}

public async removeMessage(message: Message): Promise<void> {
await this._trie.deleteBySyncId(new SyncId(message));
await this._trie.deleteBySyncId(SyncId.fromMessage(message));
}

public async getTrieNodeMetadata(prefix: Uint8Array): Promise<NodeMetadata | undefined> {
Expand Down
58 changes: 51 additions & 7 deletions apps/hubble/src/network/sync/syncId.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Factories, FarcasterNetwork, Message } from "@farcaster/hub-nodejs";
import { SyncId } from "./syncId.js";
import { makeMessagePrimaryKeyFromMessage } from "../../storage/db/message.js";
import { FNameSyncId, MessageSyncId, OnChainEventSyncId, SyncId, SyncIdType, TIMESTAMP_LENGTH } from "./syncId.js";
import { makeFidKey, makeMessagePrimaryKeyFromMessage } from "../../storage/db/message.js";
import { FID_BYTES, RootPrefix } from "../../storage/db/types.js";

let message: Message;

Expand All @@ -14,13 +15,56 @@ beforeAll(async () => {

describe("SyncId", () => {
test("succeeds", async () => {
const syncId = new SyncId(message).syncId();
const syncId = SyncId.fromMessage(message).syncId();
expect(syncId).toBeDefined();
});

test("Test pkFromSyncId", async () => {
// Create a new castAdd
const castAdd1 = await Factories.CastAddMessage.create({ data: { fid, network } }, { transient: { signer } });
expect(makeMessagePrimaryKeyFromMessage(castAdd1)).toEqual(SyncId.pkFromSyncId(new SyncId(castAdd1).syncId()));
describe("Message syncIds", () => {
test("creates and unpacks correctly from message", async () => {
const castAddMessage = await Factories.CastAddMessage.create(
{ data: { fid, network } },
{ transient: { signer } },
);
const syncId = SyncId.fromMessage(castAddMessage).syncId();
const unpackedSyncId = SyncId.unpack(syncId) as MessageSyncId;
expect(unpackedSyncId.type).toEqual(SyncIdType.Message);
expect(unpackedSyncId.fid).toEqual(fid);
expect(unpackedSyncId.hash).toEqual(castAddMessage.hash);
expect(unpackedSyncId.primaryKey).toEqual(makeMessagePrimaryKeyFromMessage(castAddMessage));
});
});

describe("FName syncIds", () => {
test("creates and unpacks correctly from message", async () => {
const fnameProof = Factories.UserNameProof.build();
const syncId = SyncId.fromFName(fnameProof).syncId();
const unpackedSyncId = SyncId.unpack(syncId) as FNameSyncId;
expect(unpackedSyncId.type).toEqual(SyncIdType.FName);
expect(unpackedSyncId.fid).toEqual(fnameProof.fid);
expect(unpackedSyncId.name).toEqual(fnameProof.name);
});
});

describe("OnChainEvent syncIds", () => {
test("creates and unpacks correctly from message", async () => {
const onChainEvent = Factories.IdRegistryOnChainEvent.build();
const syncId = SyncId.fromOnChainEvent(onChainEvent).syncId();
const unpackedSyncId = SyncId.unpack(syncId) as OnChainEventSyncId;
expect(unpackedSyncId.type).toEqual(SyncIdType.OnChainEvent);
expect(unpackedSyncId.fid).toEqual(onChainEvent.fid);
expect(unpackedSyncId.blockNumber).toEqual(onChainEvent.blockNumber);
});
});

describe("unknown syncIds", () => {
test("always returns 0 fid", async () => {
const bytes = new Uint8Array(TIMESTAMP_LENGTH + 1 + FID_BYTES + 1);
bytes.set([RootPrefix.HubEvents], TIMESTAMP_LENGTH);
const fid = Factories.Fid.build();
bytes.set(makeFidKey(fid), TIMESTAMP_LENGTH + 1);
const unpackedSyncId = SyncId.unpack(bytes);
expect(unpackedSyncId.type).toEqual(SyncIdType.Unknown);
expect(unpackedSyncId.fid).toEqual(0);
});
});
});
Loading

0 comments on commit 4b99edd

Please sign in to comment.