diff --git a/.changeset/silly-beers-fail.md b/.changeset/silly-beers-fail.md new file mode 100644 index 0000000000..6eba41d84b --- /dev/null +++ b/.changeset/silly-beers-fail.md @@ -0,0 +1,6 @@ +--- +"@farcaster/core": patch +"@farcaster/hubble": patch +--- + +feat: support fname and onchain event syncids diff --git a/apps/hubble/src/network/sync/merkleTrie.ts b/apps/hubble/src/network/sync/merkleTrie.ts index 73365e4abf..8fdf1644bc 100644 --- a/apps/hubble/src/network/sync/merkleTrie.ts +++ b/apps/hubble/src/network/sync/merkleTrie.ts @@ -220,7 +220,7 @@ class MerkleTrie { (e) => e as HubError, )(); if (message.isOk() && message.value.hash.length === HASH_LENGTH) { - await this.insert(new SyncId(message.value)); + await this.insert(SyncId.fromMessage(message.value)); count += 1; if (count % 10_000 === 0) { log.info({ count }, "Rebuilding Merkle Trie"); diff --git a/apps/hubble/src/network/sync/mock.ts b/apps/hubble/src/network/sync/mock.ts index c4ad082a63..9f7f992173 100644 --- a/apps/hubble/src/network/sync/mock.ts +++ b/apps/hubble/src/network/sync/mock.ts @@ -5,6 +5,7 @@ import { HubResult, MessagesResponse, SyncIds, TrieNodeMetadataResponse, TrieNod import Engine from "../../storage/engine/index.js"; import { NodeMetadata } from "./merkleTrie.js"; import SyncEngine from "./syncEngine.js"; +import { SyncId } from "./syncId.js"; export class MockRpcClient { engine: Engine; @@ -71,7 +72,9 @@ export class MockRpcClient { async getAllMessagesBySyncIds(request: SyncIds): Promise> { this.getAllMessagesBySyncIdsCalls.push(request); - const messagesResult = await this.syncEngine.getAllMessagesBySyncIds(request.syncIds); + const messagesResult = await this.syncEngine.getAllMessagesBySyncIds( + request.syncIds.map((s) => SyncId.fromBytes(s)), + ); return messagesResult.map((messages) => { this.getAllMessagesBySyncIdsReturns += messages.length; return MessagesResponse.create({ messages: messages ?? [] }); diff --git a/apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts b/apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts index 72ca2a35a8..6b2a4ab6bb 100644 --- a/apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts +++ b/apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts @@ -247,8 +247,8 @@ describe("Multi peer sync engine", () => { expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine1.trie.rootHash()); // Make sure the castAdd is in the trie - expect(await syncEngine1.trie.exists(new SyncId(castAdd))).toBeTruthy(); - expect(await syncEngine2.trie.exists(new SyncId(castAdd))).toBeTruthy(); + expect(await syncEngine1.trie.exists(SyncId.fromMessage(castAdd))).toBeTruthy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))).toBeTruthy(); const castRemove = await Factories.CastRemoveMessage.create( { @@ -268,10 +268,10 @@ describe("Multi peer sync engine", () => { expect(result.isOk()).toBeTruthy(); - const castRemoveId = new SyncId(castRemove); + const castRemoveId = SyncId.fromMessage(castRemove); expect(await syncEngine1.trie.exists(castRemoveId)).toBeTruthy(); // The trie should not contain the castAdd anymore - expect(await syncEngine1.trie.exists(new SyncId(castAdd))).toBeFalsy(); + expect(await syncEngine1.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy(); // Syncing engine2 --> engine1 should do nothing, even though engine2 has the castAdd and it has been removed // from engine1. @@ -297,7 +297,7 @@ describe("Multi peer sync engine", () => { await sleepWhile(() => syncEngine2.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT); expect(await syncEngine2.trie.exists(castRemoveId)).toBeTruthy(); - expect(await syncEngine2.trie.exists(new SyncId(castAdd))).toBeFalsy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy(); expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine1.trie.rootHash()); @@ -364,7 +364,7 @@ describe("Multi peer sync engine", () => { await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1); expect(fetchMessagesSpy).toHaveBeenCalledTimes(1); - expect(fetchMessagesSpy).toHaveBeenCalledWith([Buffer.from(new SyncId(msgs[0] as Message).syncId())]); + expect(fetchMessagesSpy).toHaveBeenCalledWith([SyncId.fromMessage(msgs[0] as Message)]); // Also assert the root hashes are the same expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine1.trie.rootHash()); @@ -447,15 +447,18 @@ describe("Multi peer sync engine", () => { await syncEngine2.start(); // Add a message to engine1 synctrie, but not to the engine itself. - syncEngine1.trie.insert(new SyncId(castAdd)); + syncEngine1.trie.insert(SyncId.fromMessage(castAdd)); // Attempt to sync engine2 <-- engine1. await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1); // Since the message is actually missing, it should be a no-op, and the missing message should disappear // from the sync trie - await sleepWhile(async () => (await syncEngine2.trie.exists(new SyncId(castAdd))) === true, SLEEPWHILE_TIMEOUT); - expect(await syncEngine2.trie.exists(new SyncId(castAdd))).toBeFalsy(); + await sleepWhile( + async () => (await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))) === true, + SLEEPWHILE_TIMEOUT, + ); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy(); // The root hashes should be the same, since nothing actually happened expect(await syncEngine1.trie.items()).toEqual(await syncEngine2.trie.items()); @@ -489,7 +492,7 @@ describe("Multi peer sync engine", () => { expect(await engine1.mergeMessage(castAdd1)).toBeTruthy(); // CastAdd2 is added only to the sync trie, but is missing from the engine - await syncEngine2.trie.insert(new SyncId(castAdd2)); + await syncEngine2.trie.insert(SyncId.fromMessage(castAdd2)); // Wait for the sync trie to be updated await sleepWhile(async () => (await syncEngine2.trie.items()) !== 2, SLEEPWHILE_TIMEOUT); @@ -499,12 +502,12 @@ describe("Multi peer sync engine", () => { await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1); // The sync engine should realize that castAdd2 is not in it's engine, so it should be removed from the sync trie - await sleepWhile(async () => (await syncEngine2.trie.exists(new SyncId(castAdd2))) === true, 1000); + await sleepWhile(async () => (await syncEngine2.trie.exists(SyncId.fromMessage(castAdd2))) === true, 1000); - expect(await syncEngine2.trie.exists(new SyncId(castAdd2))).toBeFalsy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd2))).toBeFalsy(); // but the castAdd1 should still be there - expect(await syncEngine2.trie.exists(new SyncId(castAdd1))).toBeTruthy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(castAdd1))).toBeTruthy(); await syncEngine2.stop(); await engine2.stop(); @@ -529,7 +532,7 @@ describe("Multi peer sync engine", () => { await engine2.mergeMessage(castAdd); // ...but we'll corrupt the sync trie by pretending that the signerAdd message is missing - syncEngine2.trie.deleteBySyncId(new SyncId(castAdd)); + syncEngine2.trie.deleteBySyncId(SyncId.fromMessage(castAdd)); // syncengine2 should be empty expect(await syncEngine2.trie.items()).toEqual(0); @@ -542,7 +545,10 @@ describe("Multi peer sync engine", () => { // Since the message isn't actually missing, it should be a no-op, and the missing message should // get added back to the sync trie - await sleepWhile(async () => (await syncEngine2.trie.exists(new SyncId(castAdd))) === false, SLEEPWHILE_TIMEOUT); + await sleepWhile( + async () => (await syncEngine2.trie.exists(SyncId.fromMessage(castAdd))) === false, + SLEEPWHILE_TIMEOUT, + ); // The root hashes should now be the same expect(await syncEngine1.trie.items()).toEqual(await syncEngine2.trie.items()); diff --git a/apps/hubble/src/network/sync/syncEngine.test.ts b/apps/hubble/src/network/sync/syncEngine.test.ts index e44b566735..c1364c3edc 100644 --- a/apps/hubble/src/network/sync/syncEngine.test.ts +++ b/apps/hubble/src/network/sync/syncEngine.test.ts @@ -110,7 +110,7 @@ describe("SyncEngine", () => { // Two messages (signerEvent + castAdd) was added to the trie expect((await syncEngine.trie.items()) - existingItems).toEqual(1); - expect(await syncEngine.trie.exists(new SyncId(castAdd))).toBeTruthy(); + expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeTruthy(); }); test("trie is not updated on merge failure", async () => { @@ -125,7 +125,7 @@ describe("SyncEngine", () => { await sleepWhile(() => syncEngine.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT); expect(await syncEngine.trie.items()).toEqual(0); - expect(await syncEngine.trie.exists(new SyncId(castAdd))).toBeFalsy(); + expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy(); }); test("trie is updated when a message is removed", async () => { @@ -150,10 +150,10 @@ describe("SyncEngine", () => { // Wait for the trie to be updated await sleepWhile(() => syncEngine.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT); - const id = new SyncId(castRemove); + const id = SyncId.fromMessage(castRemove); expect(await syncEngine.trie.exists(id)).toBeTruthy(); - const allMessages = await syncEngine.getAllMessagesBySyncIds([id.syncId()]); + const allMessages = await syncEngine.getAllMessagesBySyncIds([id]); expect(allMessages.isOk()).toBeTruthy(); expect(allMessages._unsafeUnwrap()[0]?.data?.type).toEqual(MessageType.CAST_REMOVE); @@ -161,7 +161,7 @@ describe("SyncEngine", () => { expect(await syncEngine.trie.exists(id)).toBeTruthy(); // The trie should not contain the castAdd anymore - expect(await syncEngine.trie.exists(new SyncId(castAdd))).toBeFalsy(); + expect(await syncEngine.trie.exists(SyncId.fromMessage(castAdd))).toBeFalsy(); }); test("trie is updated for username proof messages", async () => { @@ -200,12 +200,12 @@ describe("SyncEngine", () => { // SignerAdd and Username proof is added to the trie expect((await syncEngine.trie.items()) - existingItems).toEqual(1); - expect(await syncEngine.trie.exists(new SyncId(proof))).toBeTruthy(); + expect(await syncEngine.trie.exists(SyncId.fromMessage(proof))).toBeTruthy(); }); test("getAllMessages returns empty with invalid syncId", async () => { expect(await syncEngine.trie.items()).toEqual(0); - const result = await syncEngine.getAllMessagesBySyncIds([new SyncId(castAdd).syncId()]); + const result = await syncEngine.getAllMessagesBySyncIds([SyncId.fromMessage(castAdd)]); expect(result.isOk()).toBeTruthy(); expect(result._unsafeUnwrap()[0]?.data).toBeUndefined(); expect(result._unsafeUnwrap()[0]?.hash.length).toEqual(0); @@ -410,9 +410,9 @@ describe("SyncEngine", () => { // Make sure all messages exist expect(await syncEngine2.trie.items()).toEqual(3); expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine.trie.rootHash()); - expect(await syncEngine2.trie.exists(new SyncId(messages[0] as Message))).toBeTruthy(); - expect(await syncEngine2.trie.exists(new SyncId(messages[1] as Message))).toBeTruthy(); - expect(await syncEngine2.trie.exists(new SyncId(messages[2] as Message))).toBeTruthy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[0] as Message))).toBeTruthy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[1] as Message))).toBeTruthy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[2] as Message))).toBeTruthy(); await syncEngine2.stop(); }); @@ -434,9 +434,9 @@ describe("SyncEngine", () => { // Make sure all messages exist expect(await syncEngine2.trie.items()).toEqual(3); expect(await syncEngine2.trie.rootHash()).toEqual(await syncEngine.trie.rootHash()); - expect(await syncEngine2.trie.exists(new SyncId(messages[0] as Message))).toBeTruthy(); - expect(await syncEngine2.trie.exists(new SyncId(messages[1] as Message))).toBeTruthy(); - expect(await syncEngine2.trie.exists(new SyncId(messages[2] as Message))).toBeTruthy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[0] as Message))).toBeTruthy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[1] as Message))).toBeTruthy(); + expect(await syncEngine2.trie.exists(SyncId.fromMessage(messages[2] as Message))).toBeTruthy(); await syncEngine2.stop(); }, diff --git a/apps/hubble/src/network/sync/syncEngine.ts b/apps/hubble/src/network/sync/syncEngine.ts index 0b3b4d5fd8..e008523845 100644 --- a/apps/hubble/src/network/sync/syncEngine.ts +++ b/apps/hubble/src/network/sync/syncEngine.ts @@ -24,7 +24,7 @@ import { err, ok, Result, ResultAsync } from "neverthrow"; import { TypedEmitter } from "tiny-typed-emitter"; import { APP_VERSION, FARCASTER_VERSION, Hub, HubInterface } from "../../hubble.js"; import { MerkleTrie, NodeMetadata } from "./merkleTrie.js"; -import { formatPrefix, prefixToTimestamp, SyncId, timestampToPaddedTimestampPrefix } from "./syncId.js"; +import { formatPrefix, prefixToTimestamp, SyncId, SyncIdType, timestampToPaddedTimestampPrefix } from "./syncId.js"; import { TrieSnapshot } from "./trieNode.js"; import { getManyMessages } from "../../storage/db/message.js"; import RocksDB from "../../storage/db/rocksdb.js"; @@ -268,9 +268,9 @@ class SyncEngine extends TypedEmitter { } /** Revoke the individual syncIDs in the Sync Trie */ - public async revokeSyncIds(syncIds: Uint8Array[]) { + public async revokeSyncIds(syncIds: SyncId[]) { for (const syncId of syncIds) { - await this._trie.deleteByBytes(syncId); + await this._trie.deleteByBytes(syncId.syncId()); } } @@ -638,8 +638,14 @@ class SyncEngine extends TypedEmitter { return fullSyncResult; } - async getAllMessagesBySyncIds(syncIds: Uint8Array[]): HubAsyncResult { - const msgPKs = syncIds.map((syncId) => SyncId.pkFromSyncId(syncId)); + async getAllMessagesBySyncIds(syncIds: SyncId[]): HubAsyncResult { + const msgPKs: Buffer[] = []; + for (const syncId of syncIds) { + const unpacked = syncId.unpack(); + if (unpacked.type === SyncIdType.Message) { + msgPKs.push(Buffer.from(unpacked.primaryKey)); + } + } return ResultAsync.fromPromise(getManyMessages(this._db, msgPKs), (e) => e as HubError); } @@ -681,7 +687,8 @@ class SyncEngine extends TypedEmitter { // Make sure that the messages are actually for the SyncIDs const syncIdHashes = new Set( syncIds - .map((syncId) => bytesToHexString(SyncId.hashFromSyncId(syncId)).unwrapOr("")) + .map((syncId) => SyncId.unpack(syncId)) + .map((syncId) => (syncId.type === SyncIdType.Message ? bytesToHexString(syncId.hash).unwrapOr("") : "")) .filter((str) => str !== ""), ); @@ -773,7 +780,7 @@ class SyncEngine extends TypedEmitter { } else if (result.error.errCode === "bad_request.duplicate") { // This message has been merged into the DB, but for some reason is not in the Trie. // Just update the trie. - await this.trie.insert(new SyncId(msg)); + await this.trie.insert(SyncId.fromMessage(msg)); mergeResults.push(result); errCount += 1; } else { @@ -901,10 +908,13 @@ class SyncEngine extends TypedEmitter { if (ourNode?.prefix) { const suspectSyncIDs = await this.trie.getAllValues(ourNode?.prefix); - const messagesResult = await this.getAllMessagesBySyncIds(suspectSyncIDs); + const messageSyncIds = suspectSyncIDs + .map((s) => SyncId.fromBytes(s)) + .filter((syncId) => syncId.unpack().type === SyncIdType.Message); + const messagesResult = await this.getAllMessagesBySyncIds(messageSyncIds); if (messagesResult.isOk()) { - const corruptedSyncIds = this.findCorruptedSyncIDs(messagesResult.value, suspectSyncIDs); + const corruptedSyncIds = this.findCorruptedSyncIDs(messagesResult.value, messageSyncIds); if (corruptedSyncIds.length > 0) { log.warn( @@ -971,21 +981,21 @@ class SyncEngine extends TypedEmitter { } } - public findCorruptedSyncIDs(messages: Message[], syncIds: Uint8Array[]): Uint8Array[] { + public findCorruptedSyncIDs(messages: Message[], syncIds: SyncId[]): SyncId[] { return messages .map((message, i) => (message.data === undefined || message.hash.length === 0 ? syncIds[i] : undefined)) - .filter((i) => i !== undefined) as Uint8Array[]; + .filter((i) => i !== undefined) as SyncId[]; } /** ---------------------------------------------------------------------------------- */ /** Trie Methods */ /** ---------------------------------------------------------------------------------- */ public async addMessage(message: Message): Promise { - await this._trie.insert(new SyncId(message)); + await this._trie.insert(SyncId.fromMessage(message)); } public async removeMessage(message: Message): Promise { - await this._trie.deleteBySyncId(new SyncId(message)); + await this._trie.deleteBySyncId(SyncId.fromMessage(message)); } public async getTrieNodeMetadata(prefix: Uint8Array): Promise { diff --git a/apps/hubble/src/network/sync/syncId.test.ts b/apps/hubble/src/network/sync/syncId.test.ts index c4c9760983..e253830197 100644 --- a/apps/hubble/src/network/sync/syncId.test.ts +++ b/apps/hubble/src/network/sync/syncId.test.ts @@ -1,6 +1,7 @@ import { Factories, FarcasterNetwork, Message } from "@farcaster/hub-nodejs"; -import { SyncId } from "./syncId.js"; -import { makeMessagePrimaryKeyFromMessage } from "../../storage/db/message.js"; +import { FNameSyncId, MessageSyncId, OnChainEventSyncId, SyncId, SyncIdType, TIMESTAMP_LENGTH } from "./syncId.js"; +import { makeFidKey, makeMessagePrimaryKeyFromMessage } from "../../storage/db/message.js"; +import { FID_BYTES, RootPrefix } from "../../storage/db/types.js"; let message: Message; @@ -14,13 +15,56 @@ beforeAll(async () => { describe("SyncId", () => { test("succeeds", async () => { - const syncId = new SyncId(message).syncId(); + const syncId = SyncId.fromMessage(message).syncId(); expect(syncId).toBeDefined(); }); - test("Test pkFromSyncId", async () => { - // Create a new castAdd - const castAdd1 = await Factories.CastAddMessage.create({ data: { fid, network } }, { transient: { signer } }); - expect(makeMessagePrimaryKeyFromMessage(castAdd1)).toEqual(SyncId.pkFromSyncId(new SyncId(castAdd1).syncId())); + describe("Message syncIds", () => { + test("creates and unpacks correctly from message", async () => { + const castAddMessage = await Factories.CastAddMessage.create( + { data: { fid, network } }, + { transient: { signer } }, + ); + const syncId = SyncId.fromMessage(castAddMessage).syncId(); + const unpackedSyncId = SyncId.unpack(syncId) as MessageSyncId; + expect(unpackedSyncId.type).toEqual(SyncIdType.Message); + expect(unpackedSyncId.fid).toEqual(fid); + expect(unpackedSyncId.hash).toEqual(castAddMessage.hash); + expect(unpackedSyncId.primaryKey).toEqual(makeMessagePrimaryKeyFromMessage(castAddMessage)); + }); + }); + + describe("FName syncIds", () => { + test("creates and unpacks correctly from message", async () => { + const fnameProof = Factories.UserNameProof.build(); + const syncId = SyncId.fromFName(fnameProof).syncId(); + const unpackedSyncId = SyncId.unpack(syncId) as FNameSyncId; + expect(unpackedSyncId.type).toEqual(SyncIdType.FName); + expect(unpackedSyncId.fid).toEqual(fnameProof.fid); + expect(unpackedSyncId.name).toEqual(fnameProof.name); + }); + }); + + describe("OnChainEvent syncIds", () => { + test("creates and unpacks correctly from message", async () => { + const onChainEvent = Factories.IdRegistryOnChainEvent.build(); + const syncId = SyncId.fromOnChainEvent(onChainEvent).syncId(); + const unpackedSyncId = SyncId.unpack(syncId) as OnChainEventSyncId; + expect(unpackedSyncId.type).toEqual(SyncIdType.OnChainEvent); + expect(unpackedSyncId.fid).toEqual(onChainEvent.fid); + expect(unpackedSyncId.blockNumber).toEqual(onChainEvent.blockNumber); + }); + }); + + describe("unknown syncIds", () => { + test("always returns 0 fid", async () => { + const bytes = new Uint8Array(TIMESTAMP_LENGTH + 1 + FID_BYTES + 1); + bytes.set([RootPrefix.HubEvents], TIMESTAMP_LENGTH); + const fid = Factories.Fid.build(); + bytes.set(makeFidKey(fid), TIMESTAMP_LENGTH + 1); + const unpackedSyncId = SyncId.unpack(bytes); + expect(unpackedSyncId.type).toEqual(SyncIdType.Unknown); + expect(unpackedSyncId.fid).toEqual(0); + }); }); }); diff --git a/apps/hubble/src/network/sync/syncId.ts b/apps/hubble/src/network/sync/syncId.ts index fcb4cd3335..c75054ee23 100644 --- a/apps/hubble/src/network/sync/syncId.ts +++ b/apps/hubble/src/network/sync/syncId.ts @@ -1,45 +1,160 @@ -import { makeMessagePrimaryKey, typeToSetPostfix } from "../../storage/db/message.js"; -import { FID_BYTES, HASH_LENGTH } from "../../storage/db/types.js"; -import { Message } from "@farcaster/hub-nodejs"; +import { makeFidKey, makeMessagePrimaryKey, typeToSetPostfix } from "../../storage/db/message.js"; +import { FID_BYTES, HASH_LENGTH, RootPrefix } from "../../storage/db/types.js"; +import { Message, OnChainEvent, toFarcasterTime, UserNameProof } from "@farcaster/hub-nodejs"; +import { makeOnChainEventPrimaryKey } from "../../storage/db/onChainEvent.js"; const TIMESTAMP_LENGTH = 10; // Used to represent a decimal timestamp +// Typescript enum for syncid types +export enum SyncIdType { + Unknown = 0, + Message = 1, + FName = 2, + OnChainEvent = 3, +} + +type BaseUnpackedSyncId = { + fid: number; + type: SyncIdType; +}; + +export type UnknownSyncId = BaseUnpackedSyncId & { + type: SyncIdType.Unknown; + fid: 0; +}; + +export type MessageSyncId = BaseUnpackedSyncId & { + type: SyncIdType.Message; + primaryKey: Uint8Array; + hash: Uint8Array; +}; + +export type FNameSyncId = BaseUnpackedSyncId & { + type: SyncIdType.FName; + name: Uint8Array; +}; + +export type OnChainEventSyncId = BaseUnpackedSyncId & { + type: SyncIdType.OnChainEvent; + blockNumber: number; +}; + +export type UnpackedSyncId = UnknownSyncId | MessageSyncId | FNameSyncId | OnChainEventSyncId; + /** - * SyncIds are used to represent a Farcaster Message in the MerkleTrie and are ordered by timestamp - * followed by lexicographical value of some of the message's properties. + * SyncIds are used to represent a Farcaster Message, an FName proof or an OnChainEvent in the MerkleTrie + * and are ordered by timestamp followed by a unique lexicographical identifier based on the type. * - * They are constructed by prefixing the message's rocks db key (fid, rocks db prefix, tsHash) with - * its timestamp. Duplicating the timestamp in the prefix is space inefficient, but allows sorting - * ids by time while also allowing fast lookups using the rocksdb key. + * They are of the format: where: + * timestampstring: is a 10 digit, 0 leftpadded string representing the timestamp in seconds (farcaster epoch) + * root prefix: is a single byte representing the root prefix + * message_ident: fid + message postfix + tshash + * fname_ident: fid + name + * onchainevent_ident: event_type + fid + blocknumber + logindex */ class SyncId { - private readonly _fid: number; - private readonly _hash: Uint8Array; - private readonly _timestamp: number; - private readonly _type: number; - - constructor(message: Message) { - this._fid = message.data?.fid || 0; - this._hash = message.hash; - this._timestamp = message.data?.timestamp || 0; - this._type = message.data?.type || 0; + private readonly _bytes: Uint8Array; + + private constructor(bytes: Uint8Array) { + this._bytes = bytes; + } + private static fromTimestamp(farcasterTimestamp: number, _buffer: Buffer) { + return new SyncId( + new Uint8Array(Buffer.concat([Buffer.from(timestampToPaddedTimestampPrefix(farcasterTimestamp)), _buffer])), + ); } - /** Returns a byte array that represents a SyncId */ - public syncId(): Uint8Array { - const timestampString = timestampToPaddedTimestampPrefix(this._timestamp); + static fromMessage(message: Message): SyncId { + const fid = message.data?.fid || 0; + const hash = message.hash; + const timestamp = message.data?.timestamp || 0; + const type = message.data?.type || 0; // Note: We use the hash directly instead of the tsHash because the "ts" part of the tsHash is // just the timestamp, which is already a part of the key (first 10 bytes) - // When we do the reverse lookup (`pkFromSyncId), we'll remember to add the timestamp back. - const buf = makeMessagePrimaryKey(this._fid, typeToSetPostfix(this._type), this._hash); + // When we unpack the syncid, we'll remember to add the timestamp back. + const pkBuf = makeMessagePrimaryKey(fid, typeToSetPostfix(type), hash); // Construct and returns the Sync Id by prepending the timestamp to the rocksdb message key - return new Uint8Array(Buffer.concat([Buffer.from(timestampString), buf])); + // Note the first byte of the pk is the RootPrefix + return SyncId.fromTimestamp(timestamp, pkBuf); + } + + static fromFName(usernameProof: UserNameProof): SyncId { + const timestampRes = toFarcasterTime(usernameProof.timestamp * 1000); + if (timestampRes.isErr()) { + throw timestampRes.error; + } + return SyncId.fromTimestamp( + timestampRes.value, + Buffer.concat([ + Buffer.from([RootPrefix.FNameUserNameProof]), + makeFidKey(usernameProof.fid), + Buffer.from(usernameProof.name), + ]), + ); + } + + static fromOnChainEvent(onChainEvent: OnChainEvent): SyncId { + const timestampRes = toFarcasterTime(onChainEvent.blockTimestamp * 1000); + if (timestampRes.isErr()) { + throw timestampRes.error; + } + const eventPk = makeOnChainEventPrimaryKey( + onChainEvent.type, + onChainEvent.fid, + onChainEvent.blockNumber, + onChainEvent.logIndex, + ); + return SyncId.fromTimestamp(timestampRes.value, eventPk); + } + + static fromBytes(bytes: Uint8Array): SyncId { + return new SyncId(new Uint8Array(bytes)); + } + + /** Returns a byte array that represents a SyncId */ + public syncId(): Uint8Array { + return this._bytes; + } + + public unpack(): UnpackedSyncId { + return SyncId.unpack(this._bytes); + } + + static unpack(syncId: Uint8Array): UnpackedSyncId { + const rootPrefixOffset = TIMESTAMP_LENGTH; + const rootPrefix = syncId[rootPrefixOffset]; + const idBuf = Buffer.from(syncId); + if (rootPrefix === RootPrefix.User) { + // Message SyncId + return { + type: SyncIdType.Message, + fid: idBuf.readUInt32BE(TIMESTAMP_LENGTH + 1), // 1 byte for the root prefix + primaryKey: this.pkFromSyncId(syncId), + hash: syncId.slice(TIMESTAMP_LENGTH + 1 + FID_BYTES + 1), // 1 byte after fid for the set postfix + }; + } else if (rootPrefix === RootPrefix.FNameUserNameProof) { + return { + type: SyncIdType.FName, + fid: idBuf.readUInt32BE(TIMESTAMP_LENGTH + 1), // 1 byte for the root prefix + name: syncId.slice(TIMESTAMP_LENGTH + 1 + FID_BYTES), + }; + } else if (rootPrefix === RootPrefix.OnChainEvent) { + return { + type: SyncIdType.OnChainEvent, + fid: idBuf.readUInt32BE(TIMESTAMP_LENGTH + 1 + 1 + 1), // 1 byte for the root prefix, 1 byte for the postfix, 1 byte for the event type + blockNumber: idBuf.readUInt32BE(TIMESTAMP_LENGTH + 1 + 1 + 1 + FID_BYTES), + }; + } + return { + type: SyncIdType.Unknown, + fid: 0, + }; } /** Returns the rocks db primary key used to look up the message */ - static pkFromSyncId(syncId: Uint8Array): Buffer { + private static pkFromSyncId(syncId: Uint8Array): Buffer { const ts = syncId.slice(0, TIMESTAMP_LENGTH); const tsBE = Buffer.alloc(4); tsBE.writeUInt32BE(parseInt(Buffer.from(ts).toString(), 10), 0); @@ -57,11 +172,6 @@ class SyncId { return Buffer.from(pk); } - - /** Return the message hash for the SyncID */ - static hashFromSyncId(syncId: Uint8Array): Uint8Array { - return syncId.slice(TIMESTAMP_LENGTH + 1 + FID_BYTES + 1); - } } /** Normalizes the timestamp in seconds to fixed length to ensure consistent depth in the trie */ diff --git a/apps/hubble/src/network/utils/factories.ts b/apps/hubble/src/network/utils/factories.ts index 567a8cd467..e3f7620d95 100644 --- a/apps/hubble/src/network/utils/factories.ts +++ b/apps/hubble/src/network/utils/factories.ts @@ -60,7 +60,7 @@ const SyncIdFactory = Factory.define SyncId.fromBytes(syncId)); + const messagesResult = await this.syncEngine?.getAllMessagesBySyncIds(syncIds); messagesResult?.match( (messages) => { // Check the messages for corruption. If a message is blank, that means it was present // in our sync trie, but the DB couldn't find it. So remove it from the sync Trie. - const corruptedSyncIds = this.syncEngine?.findCorruptedSyncIDs(messages, request.syncIds); + const corruptedSyncIds = this.syncEngine?.findCorruptedSyncIDs(messages, syncIds); if ((corruptedSyncIds?.length ?? 0) > 0) { log.warn( diff --git a/apps/hubble/src/test/bench/helpers.ts b/apps/hubble/src/test/bench/helpers.ts index 525d21a037..d2511f3df8 100644 --- a/apps/hubble/src/test/bench/helpers.ts +++ b/apps/hubble/src/test/bench/helpers.ts @@ -40,7 +40,7 @@ export const generateSyncIds = (n: number, numFids = 1, maxTimeShift = 1): SyncI */ export const fastSyncId = (fid: number, hash: Uint8Array, timestamp: number, type: number) => { // Ducktyping message model to avoid creating the whole message. - return new SyncId({ + return SyncId.fromMessage({ data: { fid, timestamp, diff --git a/packages/core/src/factories.ts b/packages/core/src/factories.ts index 60b0b01236..4ed34406fb 100644 --- a/packages/core/src/factories.ts +++ b/packages/core/src/factories.ts @@ -21,7 +21,7 @@ import { } from "./protobufs"; import { bytesToHexString, utf8StringToBytes } from "./bytes"; import { Ed25519Signer, Eip712Signer, NobleEd25519Signer, Signer, ViemLocalEip712Signer } from "./signers"; -import { getFarcasterTime, toFarcasterTime } from "./time"; +import { FARCASTER_EPOCH, getFarcasterTime, toFarcasterTime } from "./time"; import { VerificationEthAddressClaim } from "./verifications"; import { LocalAccount } from "viem"; @@ -577,7 +577,7 @@ const OnChainEventFactory = Factory.define(() => { fid: FidFactory.build(), blockNumber: faker.datatype.number({ min: 1, max: 100_000 }), blockHash: BlockHashFactory.build(), - blockTimestamp: faker.datatype.datetime().getTime(), + blockTimestamp: Math.floor(faker.datatype.datetime({ min: FARCASTER_EPOCH }).getTime() / 1000), transactionHash: TransactionHashFactory.build(), logIndex: faker.datatype.number({ min: 0, max: 1_000 }), });