Skip to content

Commit eda55a0

Browse files
committed
fix: prevent race condition when subscribe command issued during connect
1 parent f33a2c8 commit eda55a0

File tree

3 files changed

+36
-6
lines changed

3 files changed

+36
-6
lines changed

lib/DataHandler.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ export interface Condition {
1414
select: number;
1515
auth?: string | [string, string];
1616
subscriber: false | SubscriptionSet;
17+
/**
18+
* Whether the connection has issued a subscribe command during `connect` or `ready`.
19+
*/
20+
hasIssuedSubscribe: boolean;
1721
}
1822

1923
export type FlushQueueOptions = {
@@ -89,6 +93,7 @@ export default class DataHandler {
8993
}
9094

9195
private returnReply(reply: ReplyData) {
96+
console.log('reply');
9297
if (this.handleMonitorReply(reply)) {
9398
return;
9499
}
@@ -97,6 +102,7 @@ export default class DataHandler {
97102
}
98103

99104
const item = this.shiftCommand(reply);
105+
console.log('reply command', item.command.name);
100106
if (!item) {
101107
return;
102108
}

lib/Redis.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ class Redis extends Commander implements DataHandledable {
195195
? [options.username, options.password]
196196
: options.password,
197197
subscriber: false,
198+
hasIssuedSubscribe: false,
198199
};
199200

200201
const _this = this;
@@ -429,6 +430,17 @@ class Redis extends Commander implements DataHandledable {
429430
command.reject(new Error(CONNECTION_CLOSED_ERROR_MSG));
430431
return command.promise;
431432
}
433+
434+
// Make sure know that a subscribe command is sent to the server
435+
// In order to prevent race condition by sending another non-subscribe command
436+
// before we've received the response of the previous subscribe command
437+
if (
438+
(this.status === "connect" || this.status === "ready") &&
439+
Command.checkFlag("ENTER_SUBSCRIBER_MODE", command.name)
440+
) {
441+
this.condition.hasIssuedSubscribe = true;
442+
}
443+
432444
if (
433445
this.condition?.subscriber &&
434446
!Command.checkFlag("VALID_IN_SUBSCRIBER_MODE", command.name)
@@ -527,7 +539,10 @@ class Redis extends Commander implements DataHandledable {
527539
this.manuallyClosing = true;
528540
}
529541

530-
if (this.options.socketTimeout !== undefined && this.socketTimeoutTimer === undefined) {
542+
if (
543+
this.options.socketTimeout !== undefined &&
544+
this.socketTimeoutTimer === undefined
545+
) {
531546
this.setSocketTimeout();
532547
}
533548
}
@@ -546,7 +561,11 @@ class Redis extends Commander implements DataHandledable {
546561

547562
private setSocketTimeout() {
548563
this.socketTimeoutTimer = setTimeout(() => {
549-
this.stream.destroy(new Error(`Socket timeout. Expecting data, but didn't receive any in ${this.options.socketTimeout}ms.`));
564+
this.stream.destroy(
565+
new Error(
566+
`Socket timeout. Expecting data, but didn't receive any in ${this.options.socketTimeout}ms.`
567+
)
568+
);
550569
this.socketTimeoutTimer = undefined;
551570
}, this.options.socketTimeout);
552571

lib/redis/event_handler.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@ export function connectHandler(self) {
7777

7878
const clientCommandPromises = [];
7979

80-
if (self.options.connectionName) {
80+
if (self.options.connectionName && !self.condition.hasIssuedSubscribe) {
8181
debug("set the connection name [%s]", self.options.connectionName);
8282
clientCommandPromises.push(
8383
self.client("setname", self.options.connectionName).catch(noop)
8484
);
8585
}
8686

87-
if (!self.options.disableClientInfo) {
87+
if (!self.options.disableClientInfo && !self.condition.hasIssuedSubscribe) {
8888
debug("set the client info");
8989
clientCommandPromises.push(
9090
getPackageMeta()
@@ -112,11 +112,16 @@ export function connectHandler(self) {
112112
Promise.all(clientCommandPromises)
113113
.catch(noop)
114114
.finally(() => {
115-
if (!self.options.enableReadyCheck) {
115+
// Ready check should not be performed after a subscribe command
116+
// Because it might result in a race condition
117+
// Additionally because we're using RESP2 another client should be used for normal commands
118+
const shouldReadyCheck = self.options.enableReadyCheck && !self.condition.hasIssuedSubscribe;
119+
120+
if (!shouldReadyCheck) {
116121
exports.readyHandler(self)();
117122
}
118123

119-
if (self.options.enableReadyCheck) {
124+
if (shouldReadyCheck) {
120125
self._readyCheck(function (err, info) {
121126
if (connectionEpoch !== self.connectionEpoch) {
122127
return;

0 commit comments

Comments
 (0)