From 14552367bd835f354365aa0315147dd1de6debdc Mon Sep 17 00:00:00 2001 From: Hristo Temelski Date: Mon, 16 Jun 2025 14:10:22 +0300 Subject: [PATCH 1/2] Force slots refresh on MOVED error when using ssubscribe --- lib/DataHandler.ts | 5 +++++ lib/cluster/ClusterSubscriber.ts | 4 ++++ lib/cluster/ClusterSubscriberGroup.ts | 6 +++++- lib/cluster/index.ts | 2 +- 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/lib/DataHandler.ts b/lib/DataHandler.ts index d9030517..fb635944 100644 --- a/lib/DataHandler.ts +++ b/lib/DataHandler.ts @@ -80,6 +80,11 @@ export default class DataHandler { args: item.command.args, }; + if (item.command.name == "ssubscribe" && err.message.includes("MOVED")) { + this.redis.emit("moved"); + return; + } + this.redis.handleReconnection(err, item); } diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts index 5c728560..89b17508 100644 --- a/lib/cluster/ClusterSubscriber.ts +++ b/lib/cluster/ClusterSubscriber.ts @@ -160,6 +160,10 @@ export default class ClusterSubscriber { // Ignore the errors since they're handled in the connection pool. this.subscriber.on("error", noop); + this.subscriber.on("moved", () => { + this.emitter.emit("forceRefresh"); + }); + // The node we lost connection to may not come back up in a // reasonable amount of time (e.g. a slave that's taken down // for maintainence), we could potentially miss many published diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index b55b0482..be85d05f 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -31,7 +31,7 @@ export default class ClusterSubscriberGroup { * * @param cluster */ - constructor(private cluster: Cluster) { + constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) { cluster.on("+node", (redis) => { this._addSubscriber(redis); @@ -44,6 +44,10 @@ export default class ClusterSubscriberGroup { cluster.on("refresh", () => { this._refreshSlots(cluster); }); + + cluster.on("forceRefresh", () => { + refreshSlotsCacheCallback(); + }); } diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index a046c3b6..b598a161 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -126,7 +126,7 @@ class Cluster extends Commander { this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options); if (this.options.shardedSubscribers == true) - this.shardedSubscribers = new ClusterSubscriberGroup(this); + this.shardedSubscribers = new ClusterSubscriberGroup(this, this.refreshSlotsCache.bind(this)); if ( this.options.redisOptions && From 0766449eead734d6e4ebc66f7bf683b9b92e4594 Mon Sep 17 00:00:00 2001 From: Hristo Temelski Date: Tue, 17 Jun 2025 11:09:42 +0300 Subject: [PATCH 2/2] enabled cluster tests --- .github/workflows/test.yml | 1 + package.json | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5f99d135..d407e2c3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -33,3 +33,4 @@ jobs: - run: npm run build - run: npm run test:tsd - run: npm run test:cov || npm run test:cov || npm run test:cov + - run: npm run test:js || npm run test:js:cluster diff --git a/package.json b/package.json index 5b5a65b7..e1a495e2 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,7 @@ "test:js": "TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test mocha \"test/helpers/*.ts\" \"test/unit/**/*.ts\" \"test/functional/**/*.ts\"", "test:cov": "nyc npm run test:js", "test:js:cluster": "TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test mocha \"test/cluster/**/*.ts\"", - "test": "npm run test:js && npm run test:tsd", + "test": "npm run test:js && npm run test:tsd && npm run test:js:cluster", "lint": "eslint --ext .js,.ts ./lib", "docs": "npx typedoc --logLevel Error --excludeExternals --excludeProtected --excludePrivate --readme none lib/index.ts", "format": "prettier --write \"{,!(node_modules)/**/}*.{js,ts}\"",