diff --git a/apps/hubble/CHANGELOG.md b/apps/hubble/CHANGELOG.md index 58d4478246..6095ce38c7 100644 --- a/apps/hubble/CHANGELOG.md +++ b/apps/hubble/CHANGELOG.md @@ -1,5 +1,15 @@ # @farcaster/hubble +## 1.13.5 + +### Patch Changes + +- 224e75fa: fix: gossip contact info every 30 minutes instead of every minute, avoid gossiping contact info on peer connect, skip contact info updates that happen too frequently +- c723f655: feat: Add endpoints to control sync +- 667a5b30: feat: add experimental HTTP APIs to control sync +- Updated dependencies [c723f655] + - @farcaster/hub-nodejs@0.11.19 + ## 1.13.4 ### Patch Changes diff --git a/apps/hubble/package.json b/apps/hubble/package.json index 6b429db7af..a9c1cab45e 100644 --- a/apps/hubble/package.json +++ b/apps/hubble/package.json @@ -1,6 +1,6 @@ { "name": "@farcaster/hubble", - "version": "1.13.4", + "version": "1.13.5", "description": "Farcaster Hub", "author": "", "license": "", @@ -75,7 +75,7 @@ "@chainsafe/libp2p-noise": "^11.0.0 ", "@datastructures-js/priority-queue": "^6.3.1", "@faker-js/faker": "~7.6.0", - "@farcaster/hub-nodejs": "^0.11.18", + "@farcaster/hub-nodejs": "^0.11.19", "@fastify/cors": "^8.4.0", "@figma/hot-shots": "^9.0.0-figma.1", "@grpc/grpc-js": "~1.8.22", diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 97b27b3046..77cf7f5c9b 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -123,7 +123,8 @@ export const FARCASTER_VERSIONS_SCHEDULE: VersionSchedule[] = [ { version: "2024.6.12", expiresAt: 1722988800000 }, // expires at 8/7/24 00:00 UTC ]; -const MAX_CONTACT_INFO_AGE_MS = GOSSIP_SEEN_TTL; +const MAX_CONTACT_INFO_AGE_MS = 1000 * 60 * 60; // 60 minutes +const CONTACT_INFO_UPDATE_THRESHOLD_MS = 1000 * 60 * 30; // 30 minutes export interface HubInterface { engine: Engine; @@ -349,7 +350,6 @@ export class Hub implements HubInterface { private adminServer: AdminServer; private httpApiServer: HttpAPIServer; - private contactTimer?: NodeJS.Timer; private rocksDB: RocksDB; private syncEngine: SyncEngine; private allowedPeerIds: string[] | undefined; @@ -790,7 +790,9 @@ export class Hub implements HubInterface { this.pruneEventsJobScheduler.start(this.options.pruneEventsJobCron); this.checkFarcasterVersionJobScheduler.start(); this.validateOrRevokeMessagesJobScheduler.start(); - this.gossipContactInfoJobScheduler.start("*/1 * * * *"); // Every minute + + const randomMinute = Math.floor(Math.random() * 30); + this.gossipContactInfoJobScheduler.start(`${randomMinute} */30 * * * *`); // Random minute every 30 minutes this.checkIncomingPortsJobScheduler.start(); // Mainnet only jobs @@ -1176,7 +1178,6 @@ export class Hub implements HubInterface { /** Stop the GossipNode and RPC Server */ async stop(reason: HubShutdownReason, terminateGossipWorker = true) { log.info("Stopping Hubble..."); - clearInterval(this.contactTimer); // First, stop the RPC/Gossip server so we don't get any more messages if (!this.options.httpServerDisabled) { @@ -1515,7 +1516,7 @@ export class Hub implements HubInterface { log.debug({ identity: this.identity, peer: peerId, message }, "received peer ContactInfo"); // Check if we already have this client - const result = this.syncEngine.addContactInfoForPeerId(peerId, message); + const result = this.syncEngine.addContactInfoForPeerId(peerId, message, CONTACT_INFO_UPDATE_THRESHOLD_MS); if (result.isOk() && !this.performedFirstSync) { // Sync with the first peer so we are upto date on startup. this.performedFirstSync = true; @@ -1642,11 +1643,15 @@ export class Hub implements HubInterface { }); this.gossipNode.on("peerConnect", async () => { + // NB: Gossiping our own contact info is commented out, since at the time of + // writing this the p2p network has overwhelming number of peers and spends more + // time processing contact info than messages. We may uncomment in the future + // if peer counts drop. // When we connect to a new node, gossip out our contact info 1 second later. // The setTimeout is to ensure that we have a chance to receive the peer's info properly. - setTimeout(async () => { - await this.gossipContactInfo(); - }, 1 * 1000); + // setTimeout(async () => { + // await this.gossipContactInfo(); + // }, 1 * 1000); statsd().increment("peer_connect.count"); }); diff --git a/apps/hubble/src/network/sync/syncEngine.test.ts b/apps/hubble/src/network/sync/syncEngine.test.ts index 7db261138f..30f78e2fa0 100644 --- a/apps/hubble/src/network/sync/syncEngine.test.ts +++ b/apps/hubble/src/network/sync/syncEngine.test.ts @@ -584,7 +584,8 @@ describe("SyncEngine", () => { const peerId = await createEd25519PeerId(); expect(syncEngine.getContactInfoForPeerId(peerId.toString())).toBeUndefined(); - expect(syncEngine.addContactInfoForPeerId(peerId, contactInfo)).toBeInstanceOf(Ok); + const updateThresholdMilliseconds = 0; + expect(syncEngine.addContactInfoForPeerId(peerId, contactInfo, updateThresholdMilliseconds)).toBeInstanceOf(Ok); expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.contactInfo).toEqual(contactInfo); expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.peerId).toEqual(peerId); }); @@ -596,15 +597,22 @@ describe("SyncEngine", () => { const newerContactInfo = NetworkFactories.GossipContactInfoContent.build({ timestamp: now + 10 }); const peerId = await createEd25519PeerId(); - expect(syncEngine.addContactInfoForPeerId(peerId, contactInfo)).toBeInstanceOf(Ok); + // NB: We set update value to 0, but there may non-determinism if test runs too quickly. If the tests start getting + // too flaky, we can sleep for a millisecond between function calls to make sure the time elapsed is greater than 0. + const updateThresholdMilliseconds = 0; + expect(syncEngine.addContactInfoForPeerId(peerId, contactInfo, updateThresholdMilliseconds)).toBeInstanceOf(Ok); expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.contactInfo).toEqual(contactInfo); // Adding an older contact info should not replace the existing one - expect(syncEngine.addContactInfoForPeerId(peerId, olderContactInfo)).toBeInstanceOf(Err); + expect(syncEngine.addContactInfoForPeerId(peerId, olderContactInfo, updateThresholdMilliseconds)).toBeInstanceOf( + Err, + ); expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.contactInfo).toEqual(contactInfo); // Adding a newer contact info should replace the existing one - expect(syncEngine.addContactInfoForPeerId(peerId, newerContactInfo)).toBeInstanceOf(Ok); + expect(syncEngine.addContactInfoForPeerId(peerId, newerContactInfo, updateThresholdMilliseconds)).toBeInstanceOf( + Ok, + ); expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.contactInfo).toEqual(newerContactInfo); }); }); diff --git a/apps/hubble/src/network/sync/syncEngine.ts b/apps/hubble/src/network/sync/syncEngine.ts index e309d0c25d..3d97562213 100644 --- a/apps/hubble/src/network/sync/syncEngine.ts +++ b/apps/hubble/src/network/sync/syncEngine.ts @@ -77,6 +77,8 @@ const log = logger.child({ component: "SyncEngine", }); +type NonNegativeInteger = `${T}` extends `-${string}` | `${string}.${string}` ? never : T; + interface SyncEvents { /** Emit an event when diff starts */ syncStart: () => void; @@ -493,6 +495,18 @@ class SyncEngine extends TypedEmitter { } public async stop() { + await this.stopSync(); + await this._trie.stop(); + + this._started = false; + this.curSync.interruptSync = false; + log.info("Sync engine stopped"); + } + + public async stopSync() { + if (!this.isSyncing()) { + return true; + } // Interrupt any ongoing sync this.curSync.interruptSync = true; @@ -502,13 +516,9 @@ class SyncEngine extends TypedEmitter { await sleepWhile(() => this.syncTrieQSize > 0, SYNC_INTERRUPT_TIMEOUT); } catch (e) { log.error({ err: e }, "Interrupting sync timed out"); + return false; } - - await this._trie.stop(); - - this._started = false; - this.curSync.interruptSync = false; - log.info("Sync engine stopped"); + return true; } public getBadPeerIds(): string[] { @@ -539,27 +549,39 @@ class SyncEngine extends TypedEmitter { return this.currentHubPeerContacts.values(); } - public addContactInfoForPeerId(peerId: PeerId, contactInfo: ContactInfoContentBody) { + public addContactInfoForPeerId( + peerId: PeerId, + contactInfo: ContactInfoContentBody, + updateThresholdMilliseconds: NonNegativeInteger, + ) { const existingPeerInfo = this.getContactInfoForPeerId(peerId.toString()); if (existingPeerInfo && contactInfo.timestamp <= existingPeerInfo.contactInfo.timestamp) { return err(new HubError("bad_request.duplicate", "peer already exists")); } - log.info( - { - peerInfo: contactInfo, - theirMessages: contactInfo.count, - peerNetwork: contactInfo.network, - peerVersion: contactInfo.hubVersion, - peerAppVersion: contactInfo.appVersion, - connectedPeers: this.getPeerCount(), - peerId: peerId.toString(), - isNew: !!existingPeerInfo, - gossipDelay: (Date.now() - contactInfo.timestamp) / 1000, - }, - "Updated Peer ContactInfo", - ); - this.currentHubPeerContacts.set(peerId.toString(), { peerId, contactInfo }); - return ok(undefined); + const previousTimestamp = existingPeerInfo ? existingPeerInfo.contactInfo.timestamp : -Infinity; + const elapsed = Date.now() - previousTimestamp; + + // only update if contact info was updated more than ${updateThresholdMilliseconds} ago + if (elapsed > updateThresholdMilliseconds) { + log.info( + { + peerInfo: contactInfo, + theirMessages: contactInfo.count, + peerNetwork: contactInfo.network, + peerVersion: contactInfo.hubVersion, + peerAppVersion: contactInfo.appVersion, + connectedPeers: this.getPeerCount(), + peerId: peerId.toString(), + isNew: !!existingPeerInfo, + gossipDelay: (Date.now() - contactInfo.timestamp) / 1000, + }, + "Updated Peer ContactInfo", + ); + this.currentHubPeerContacts.set(peerId.toString(), { peerId, contactInfo }); + return ok(undefined); + } else { + return err(new HubError("bad_request.duplicate", "recent contact update found for peer")); + } } public removeContactInfoForPeerId(peerId: string) { @@ -795,6 +817,46 @@ class SyncEngine extends TypedEmitter { } } + public async forceSyncWithPeer(peerId: string) { + if (this.isSyncing()) { + return err(new HubError("bad_request", "Already syncing")); + } + + const contactInfo = this.getContactInfoForPeerId(peerId); + if (!contactInfo) { + return err(new HubError("bad_request", "Peer not found")); + } + + const rpcClient = await this._hub.getRPCClientForPeer(contactInfo.peerId, contactInfo.contactInfo); + if (!rpcClient) { + return err(new HubError("bad_request", "Unreachable peer")); + } + + log.info({ peerId }, "Force sync: Starting sync"); + + const peerStateResult = await rpcClient.getSyncSnapshotByPrefix( + TrieNodePrefix.create({ prefix: new Uint8Array() }), + new Metadata(), + rpcDeadline(), + ); + + if (peerStateResult.isErr()) { + return err(peerStateResult.error); + } + + const syncStatus = await this.syncStatus(peerId, peerStateResult.value); + if (syncStatus.isErr()) { + return err(syncStatus.error); + } + + // Ignore sync status because we always want to sync, we return it to the clients can get visibility into the peer's state + // Intentionally not available here, so the grpc call can succeed immediately + this.performSync(peerId, rpcClient, false).then((result) => { + log.info({ result }, "Force sync: complete"); + }); + return ok(syncStatus.value); + } + public async syncStatus(peerId: string, theirSnapshot: TrieSnapshot): HubAsyncResult { const ourSnapshotResult = await this.getSnapshot(theirSnapshot.prefix); diff --git a/apps/hubble/src/rpc/httpServer.ts b/apps/hubble/src/rpc/httpServer.ts index 7f9ca586a8..22f99a06c4 100644 --- a/apps/hubble/src/rpc/httpServer.ts +++ b/apps/hubble/src/rpc/httpServer.ts @@ -23,6 +23,7 @@ import { ValidationResponse, base58ToBytes, bytesToBase58, + SyncStatusResponse, } from "@farcaster/hub-nodejs"; import { Metadata, ServerUnaryCall } from "@grpc/grpc-js"; import fastify from "fastify"; @@ -266,6 +267,29 @@ export class HttpAPIServer { this.grpcImpl.getCurrentPeers(call, handleResponse(reply, ContactInfoResponse)); }); + //================stopSync================ + // @doc-tag: /stopSync + this.app.post("/v1/stopSync", (request, reply) => { + const call = getCallObject("stopSync", {}, request); + this.grpcImpl.stopSync(call, handleResponse(reply, SyncStatusResponse)); + }); + + //================syncStatus================ + // @doc-tag: /syncStatus?peer_id=... + this.app.get<{ Querystring: { peer_id: string } }>("/v1/syncStatus", (request, reply) => { + const { peer_id } = request.query; + const call = getCallObject("getSyncStatus", { peerId: peer_id }, request); + this.grpcImpl.getSyncStatus(call, handleResponse(reply, SyncStatusResponse)); + }); + + //================forceSync================ + // @doc-tag: /forceSync?peer_id=... + this.app.post<{ Querystring: { peer_id: string } }>("/v1/forceSync", (request, reply) => { + const { peer_id } = request.query; + const call = getCallObject("forceSync", { peerId: peer_id }, request); + this.grpcImpl.forceSync(call, handleResponse(reply, SyncStatusResponse)); + }); + //================Casts================ // @doc-tag: /castById?fid=...&hash=... this.app.get<{ Querystring: { fid: string; hash: string } }>("/v1/castById", (request, reply) => { diff --git a/apps/hubble/src/rpc/server.ts b/apps/hubble/src/rpc/server.ts index e446bccabf..83ff5134b7 100644 --- a/apps/hubble/src/rpc/server.ts +++ b/apps/hubble/src/rpc/server.ts @@ -488,20 +488,71 @@ export default class Server { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); log.debug({ method: "getCurrentPeers", req: call.request }, `RPC call from ${peer}`); - (async () => { - const currentHubPeerContacts = this.syncEngine?.getCurrentHubPeerContacts(); + const currentHubPeerContacts = this.syncEngine?.getCurrentHubPeerContacts(); - if (!currentHubPeerContacts) { - callback(null, ContactInfoResponse.create({ contacts: [] })); - return; - } + if (!currentHubPeerContacts) { + callback(null, ContactInfoResponse.create({ contacts: [] })); + return; + } - const contactInfoArray = Array.from(currentHubPeerContacts).map((peerContact) => peerContact.contactInfo); - callback(null, ContactInfoResponse.create({ contacts: contactInfoArray })); - })(); + const contactInfoArray = Array.from(currentHubPeerContacts).map((peerContact) => peerContact.contactInfo); + callback(null, ContactInfoResponse.create({ contacts: contactInfoArray })); + })(); + }, + stopSync: (call, callback) => { + (async () => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "stopSync", req: call.request }, `RPC call from ${peer}`); + + const result = await this.syncEngine?.stopSync(); + if (!result) { + callback(toServiceError(new HubError("bad_request", "Stop sync timed out"))); + } else { + callback( + null, + SyncStatusResponse.create({ + isSyncing: this.syncEngine?.isSyncing() || false, + engineStarted: this.syncEngine?.isStarted() || false, + syncStatus: [], + }), + ); + } })(); }, + forceSync: (call, callback) => { + (async () => { + const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); + log.debug({ method: "forceSync", req: call.request }, `RPC call from ${peer}`); + const peerId = call.request.peerId; + if (!peerId || peerId.length === 0) { + callback(toServiceError(new HubError("bad_request", "peerId is required"))); + return; + } + const result = await this.syncEngine?.forceSyncWithPeer(peerId); + if (!result || result.isErr()) { + callback(toServiceError(result?.error || new HubError("bad_request", "sync engine not available"))); + } else { + const status = result.value; + const response = SyncStatusResponse.create({ + isSyncing: this.syncEngine?.isSyncing() || false, + engineStarted: this.syncEngine?.isStarted() || false, + syncStatus: [ + SyncStatus.create({ + peerId, + inSync: status.inSync, + shouldSync: status.shouldSync, + lastBadSync: status.lastBadSync, + ourMessages: status.ourSnapshot.numMessages, + theirMessages: status.theirSnapshot.numMessages, + score: status.score, + }), + ], + }); + callback(null, response); + } + })(); + }, getSyncStatus: (call, callback) => { (async () => { const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown"); diff --git a/apps/hubble/src/rpc/test/httpServer.test.ts b/apps/hubble/src/rpc/test/httpServer.test.ts index ffb50cf159..09bc6591af 100644 --- a/apps/hubble/src/rpc/test/httpServer.test.ts +++ b/apps/hubble/src/rpc/test/httpServer.test.ts @@ -14,6 +14,7 @@ import { ReactionAddMessage, ReactionType, reactionTypeToJSON, + SyncStatusResponse, toFarcasterTime, UserDataAddMessage, UserDataType, @@ -838,6 +839,25 @@ describe("httpServer", () => { }); }); +describe("sync APIs", () => { + test("stopSync", async () => { + const url = getFullUrl("/v1/stopSync"); + const response = await axios.post(url, {}); + + expect(response.status).toBe(200); + expect(response.data).toEqual( + protoToJSON( + SyncStatusResponse.create({ + isSyncing: false, + engineStarted: true, + syncStatus: [], + }), + SyncStatusResponse, + ), + ); + }); +}); + async function axiosGet(url: string) { try { return await axios.get(url); diff --git a/apps/hubble/src/rpc/test/syncService.test.ts b/apps/hubble/src/rpc/test/syncService.test.ts index 4cb97d5065..2f00ccf2f1 100644 --- a/apps/hubble/src/rpc/test/syncService.test.ts +++ b/apps/hubble/src/rpc/test/syncService.test.ts @@ -4,6 +4,7 @@ import { Factories, FarcasterNetwork, getInsecureHubRpcClient, + HubError, HubInfoRequest, HubRpcClient, OnChainEvent, @@ -100,3 +101,19 @@ describe("getSyncStatus", () => { expect(result._unsafeUnwrap().syncStatus).toHaveLength(0); }); }); + +describe("stopSync", () => { + test("succeeds", async () => { + const result = await client.stopSync({}); + expect(result.isOk()).toBeTruthy(); + expect(result._unsafeUnwrap().isSyncing).toEqual(false); + expect(result._unsafeUnwrap().syncStatus).toHaveLength(0); + }); +}); + +describe("forceSync", () => { + test("fails when peer is not found", async () => { + const result = await client.forceSync(SyncStatusRequest.create({ peerId: "test" })); + expect(result._unsafeUnwrapErr()).toEqual(new HubError("bad_request", "Peer not found")); + }); +}); diff --git a/apps/hubble/www/docs/docs/httpapi/sync.md b/apps/hubble/www/docs/docs/httpapi/sync.md new file mode 100644 index 0000000000..0e7c595651 --- /dev/null +++ b/apps/hubble/www/docs/docs/httpapi/sync.md @@ -0,0 +1,102 @@ +# Sync API +> ⚠️ **WARNING:** +> These APIs are experimental and should not be relied on for general use. They may change without notice or be removed in future versions. + +## stopSync + +Stop the synchronization process + +**Query Parameters** +| Parameter | Description | Example | +| --------- | ----------- | ------- | +| | This endpoint accepts no parameters | | + +- **Example** + +```bash +curl -X POST http://127.0.0.1:2281/v1/stopSync +``` + +**Response** + +```json +{ + "isSyncing": false, + "syncStatus": [], + "engineStarted": true +} +``` + +## syncStatus + +Get the current synchronization status + +**Query Parameters** +| Parameter | Description | Example | +| --------- | ----------- | ------- | +| peer_id | ID of the peer to get sync status for | 12D3KooWJJ9h4XVrVKgMr8ZgF6FKasEBiFEGtL7bmE8RQgzhKq1o | + +- **Example** + +```bash +curl http://127.0.0.1:2281/v1/syncStatus?peer_id=12D3KooWJJ9h4XVrVKgMr8ZgF6FKasEBiFEGtL7bmE8RQgzhKq1o +``` + +**Response** + +```json +{ + "isSyncing": true, + "syncStatus": [ + { + "peerId": "12D3KooWJJ9h4XVrVKgMr8ZgF6FKasEBiFEGtL7bmE8RQgzhKq1o", + "inSync": "true", + "shouldSync": true, + "divergencePrefix": "0x1234567890abcdef", + "divergenceSecondsAgo": 300, + "theirMessages": 1000000, + "ourMessages": 999950, + "lastBadSync": 1705796040744, + "score": 0.95 + } + ], + "engineStarted": true +} +``` + +## forceSync + +Force synchronization with a specific peer + +**Query Parameters** +| Parameter | Description | Example | +| --------- | ----------- | ------- | +| peer_id | ID of the peer to force sync with | 12D3KooWJJ9h4XVrVKgMr8ZgF6FKasEBiFEGtL7bmE8RQgzhKq1o | + +- **Example** + +```bash +curl -X POST http://127.0.0.1:2281/v1/forceSync?peer_id=12D3KooWJJ9h4XVrVKgMr8ZgF6FKasEBiFEGtL7bmE8RQgzhKq1o +``` + +**Response** + +```json +{ + "isSyncing": true, + "syncStatus": [ + { + "peerId": "12D3KooWJJ9h4XVrVKgMr8ZgF6FKasEBiFEGtL7bmE8RQgzhKq1o", + "inSync": "false", + "shouldSync": true, + "divergencePrefix": "0x1234567890abcdef", + "divergenceSecondsAgo": 0, + "theirMessages": 1000000, + "ourMessages": 999950, + "lastBadSync": 0, + "score": 1.0 + } + ], + "engineStarted": true +} +``` diff --git a/packages/hub-nodejs/CHANGELOG.md b/packages/hub-nodejs/CHANGELOG.md index 0ec634cbe4..86cafe0fd2 100644 --- a/packages/hub-nodejs/CHANGELOG.md +++ b/packages/hub-nodejs/CHANGELOG.md @@ -1,5 +1,11 @@ # @farcaster/hub-nodejs +## 0.11.19 + +### Patch Changes + +- c723f655: feat: Add endpoints to control sync + ## 0.11.18 ### Patch Changes diff --git a/packages/hub-nodejs/package.json b/packages/hub-nodejs/package.json index 0def7f2516..31acf123c5 100644 --- a/packages/hub-nodejs/package.json +++ b/packages/hub-nodejs/package.json @@ -1,6 +1,6 @@ { "name": "@farcaster/hub-nodejs", - "version": "0.11.18", + "version": "0.11.19", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/hub-nodejs/src/generated/rpc.ts b/packages/hub-nodejs/src/generated/rpc.ts index c002832345..10fe4fb309 100644 --- a/packages/hub-nodejs/src/generated/rpc.ts +++ b/packages/hub-nodejs/src/generated/rpc.ts @@ -434,6 +434,29 @@ export const HubServiceService = { responseDeserialize: (value: Buffer) => ContactInfoResponse.decode(value), }, /** @http-api: none */ + stopSync: { + path: "/HubService/StopSync", + requestStream: false, + responseStream: false, + requestSerialize: (value: Empty) => Buffer.from(Empty.encode(value).finish()), + requestDeserialize: (value: Buffer) => Empty.decode(value), + responseSerialize: (value: SyncStatusResponse) => Buffer.from(SyncStatusResponse.encode(value).finish()), + responseDeserialize: (value: Buffer) => SyncStatusResponse.decode(value), + }, + /** + * This is experimental, do not rely on this endpoint existing in the future + * @http-api: none + */ + forceSync: { + path: "/HubService/ForceSync", + requestStream: false, + responseStream: false, + requestSerialize: (value: SyncStatusRequest) => Buffer.from(SyncStatusRequest.encode(value).finish()), + requestDeserialize: (value: Buffer) => SyncStatusRequest.decode(value), + responseSerialize: (value: SyncStatusResponse) => Buffer.from(SyncStatusResponse.encode(value).finish()), + responseDeserialize: (value: Buffer) => SyncStatusResponse.decode(value), + }, + /** @http-api: none */ getSyncStatus: { path: "/HubService/GetSyncStatus", requestStream: false, @@ -578,6 +601,13 @@ export interface HubServiceServer extends UntypedServiceImplementation { getInfo: handleUnaryCall; getCurrentPeers: handleUnaryCall; /** @http-api: none */ + stopSync: handleUnaryCall; + /** + * This is experimental, do not rely on this endpoint existing in the future + * @http-api: none + */ + forceSync: handleUnaryCall; + /** @http-api: none */ getSyncStatus: handleUnaryCall; /** @http-api: none */ getAllSyncIdsByPrefix: handleUnaryCall; @@ -1163,6 +1193,41 @@ export interface HubServiceClient extends Client { callback: (error: ServiceError | null, response: ContactInfoResponse) => void, ): ClientUnaryCall; /** @http-api: none */ + stopSync( + request: Empty, + callback: (error: ServiceError | null, response: SyncStatusResponse) => void, + ): ClientUnaryCall; + stopSync( + request: Empty, + metadata: Metadata, + callback: (error: ServiceError | null, response: SyncStatusResponse) => void, + ): ClientUnaryCall; + stopSync( + request: Empty, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: SyncStatusResponse) => void, + ): ClientUnaryCall; + /** + * This is experimental, do not rely on this endpoint existing in the future + * @http-api: none + */ + forceSync( + request: SyncStatusRequest, + callback: (error: ServiceError | null, response: SyncStatusResponse) => void, + ): ClientUnaryCall; + forceSync( + request: SyncStatusRequest, + metadata: Metadata, + callback: (error: ServiceError | null, response: SyncStatusResponse) => void, + ): ClientUnaryCall; + forceSync( + request: SyncStatusRequest, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: SyncStatusResponse) => void, + ): ClientUnaryCall; + /** @http-api: none */ getSyncStatus( request: SyncStatusRequest, callback: (error: ServiceError | null, response: SyncStatusResponse) => void, diff --git a/packages/hub-web/CHANGELOG.md b/packages/hub-web/CHANGELOG.md index f21d5c2432..c41ce08981 100644 --- a/packages/hub-web/CHANGELOG.md +++ b/packages/hub-web/CHANGELOG.md @@ -1,5 +1,11 @@ # @farcaster/hub-web +## 0.8.12 + +### Patch Changes + +- c723f655: feat: Add endpoints to control sync + ## 0.8.11 ### Patch Changes diff --git a/packages/hub-web/package.json b/packages/hub-web/package.json index ec4e335f40..1610bad74d 100644 --- a/packages/hub-web/package.json +++ b/packages/hub-web/package.json @@ -1,6 +1,6 @@ { "name": "@farcaster/hub-web", - "version": "0.8.11", + "version": "0.8.12", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/hub-web/src/generated/rpc.ts b/packages/hub-web/src/generated/rpc.ts index d6e8c9f6fd..31b165ac46 100644 --- a/packages/hub-web/src/generated/rpc.ts +++ b/packages/hub-web/src/generated/rpc.ts @@ -152,6 +152,13 @@ export interface HubService { getInfo(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; getCurrentPeers(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; /** @http-api: none */ + stopSync(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; + /** + * This is experimental, do not rely on this endpoint existing in the future + * @http-api: none + */ + forceSync(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; + /** @http-api: none */ getSyncStatus(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; /** @http-api: none */ getAllSyncIdsByPrefix(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; @@ -210,6 +217,8 @@ export class HubServiceClientImpl implements HubService { this.getLinkCompactStateMessageByFid = this.getLinkCompactStateMessageByFid.bind(this); this.getInfo = this.getInfo.bind(this); this.getCurrentPeers = this.getCurrentPeers.bind(this); + this.stopSync = this.stopSync.bind(this); + this.forceSync = this.forceSync.bind(this); this.getSyncStatus = this.getSyncStatus.bind(this); this.getAllSyncIdsByPrefix = this.getAllSyncIdsByPrefix.bind(this); this.getAllMessagesBySyncIds = this.getAllMessagesBySyncIds.bind(this); @@ -383,6 +392,14 @@ export class HubServiceClientImpl implements HubService { return this.rpc.unary(HubServiceGetCurrentPeersDesc, Empty.fromPartial(request), metadata); } + stopSync(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise { + return this.rpc.unary(HubServiceStopSyncDesc, Empty.fromPartial(request), metadata); + } + + forceSync(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise { + return this.rpc.unary(HubServiceForceSyncDesc, SyncStatusRequest.fromPartial(request), metadata); + } + getSyncStatus(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise { return this.rpc.unary(HubServiceGetSyncStatusDesc, SyncStatusRequest.fromPartial(request), metadata); } @@ -1240,6 +1257,52 @@ export const HubServiceGetCurrentPeersDesc: UnaryMethodDefinitionish = { } as any, }; +export const HubServiceStopSyncDesc: UnaryMethodDefinitionish = { + methodName: "StopSync", + service: HubServiceDesc, + requestStream: false, + responseStream: false, + requestType: { + serializeBinary() { + return Empty.encode(this).finish(); + }, + } as any, + responseType: { + deserializeBinary(data: Uint8Array) { + const value = SyncStatusResponse.decode(data); + return { + ...value, + toObject() { + return value; + }, + }; + }, + } as any, +}; + +export const HubServiceForceSyncDesc: UnaryMethodDefinitionish = { + methodName: "ForceSync", + service: HubServiceDesc, + requestStream: false, + responseStream: false, + requestType: { + serializeBinary() { + return SyncStatusRequest.encode(this).finish(); + }, + } as any, + responseType: { + deserializeBinary(data: Uint8Array) { + const value = SyncStatusResponse.decode(data); + return { + ...value, + toObject() { + return value; + }, + }; + }, + } as any, +}; + export const HubServiceGetSyncStatusDesc: UnaryMethodDefinitionish = { methodName: "GetSyncStatus", service: HubServiceDesc, diff --git a/protobufs/schemas/rpc.proto b/protobufs/schemas/rpc.proto index 812395b48f..dc2afe8101 100644 --- a/protobufs/schemas/rpc.proto +++ b/protobufs/schemas/rpc.proto @@ -98,6 +98,11 @@ service HubService { rpc GetInfo(HubInfoRequest) returns (HubInfoResponse); rpc GetCurrentPeers(Empty) returns (ContactInfoResponse); // @http-api: none + rpc StopSync(Empty) returns (SyncStatusResponse); + // This is experimental, do not rely on this endpoint existing in the future + // @http-api: none + rpc ForceSync(SyncStatusRequest) returns (SyncStatusResponse); + // @http-api: none rpc GetSyncStatus(SyncStatusRequest) returns (SyncStatusResponse); // @http-api: none rpc GetAllSyncIdsByPrefix(TrieNodePrefix) returns (SyncIds);