Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 129 additions & 96 deletions lib/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,115 +189,127 @@ class Redis extends Commander implements DataHandledable {

const { options } = this;

// Note that `this.condition` has to be set _before_ any asynchronous work
// takes place as the `select` value is required when queueing commands
// into the offline queue (see sendCommand)
Comment on lines +192 to +194
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another change from @slukes work. This allows pipelining and the offline queue to function.

this.condition = {
select: options.db,
auth: options.username
? [options.username, options.password]
: options.password,
subscriber: false,
};
this.resolvePassword((err, resolvedPassword) => {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big change from @slukes work. Rather than use an async promise here, the resolvePassword logic tries to keep everything synchronous when it can. It's only when this.options.password returns a promise that this callback will be invoked in a separate tick.

If this is changed into async logic, there are a few tests that fail because there seems to be an expectation that the connect logic runs the connector which sets this.connecting = true before any other code runs.
e.g., the following test would fail when making resolvePassword async:

  it("connects successfully immediately after end", (done) => {
    const redis = new Redis();
    redis.once("end", async () => {
      await redis.connect();
      done();
    });

    redis.quit();
  });

The connect function (which is called from the constructor) calls into the connector and sets connecting to true on the connector. If that variable is no longer true in the next tick, it rejects the connect request and will trigger a transition into the end state.

However, if connect needs to resolve something else asynchronously, by the time redis.quit runs, the connector may not have run yet. The net effect of this is that the client will first disconnect and then re-connect, because the end-user has no control over the connect.

All of that said, this test could be changed to wait until the client is ready. Would that be breaking behaviour?

  it("connects successfully immediately after end", (done) => {
    const redis = new Redis();
    redis.once("end", async () => {
      await redis.connect();
      done();
    });

    redis.on("ready", () => redis.quit());
  });

if (err) {
this.flushQueue(err);
this.silentEmit("error", err);
this.setStatus("end");
reject(err);
return;
}
this.condition.auth = options.username
? [options.username, resolvedPassword]
: resolvedPassword

const _this = this;
asCallback(
this.connector.connect(function (type, err) {
_this.silentEmit(type, err);
}) as Promise<NetStream>,
function (err: Error | null, stream?: NetStream) {
if (err) {
_this.flushQueue(err);
_this.silentEmit("error", err);
reject(err);
_this.setStatus("end");
return;
}
let CONNECT_EVENT = options.tls ? "secureConnect" : "connect";
if (
"sentinels" in options &&
options.sentinels &&
!options.enableTLSForSentinelMode
) {
CONNECT_EVENT = "connect";
}

const _this = this;
asCallback(
this.connector.connect(function (type, err) {
_this.silentEmit(type, err);
}) as Promise<NetStream>,
function (err: Error | null, stream?: NetStream) {
if (err) {
_this.flushQueue(err);
_this.silentEmit("error", err);
reject(err);
_this.setStatus("end");
return;
}
let CONNECT_EVENT = options.tls ? "secureConnect" : "connect";
if (
"sentinels" in options &&
options.sentinels &&
!options.enableTLSForSentinelMode
) {
CONNECT_EVENT = "connect";
}

_this.stream = stream;
_this.stream = stream;

if (options.noDelay) {
stream.setNoDelay(true);
}
if (options.noDelay) {
stream.setNoDelay(true);
}

// Node ignores setKeepAlive before connect, therefore we wait for the event:
// https://github.com/nodejs/node/issues/31663
if (typeof options.keepAlive === "number") {
if (stream.connecting) {
stream.once(CONNECT_EVENT, () => {
// Node ignores setKeepAlive before connect, therefore we wait for the event:
// https://github.com/nodejs/node/issues/31663
if (typeof options.keepAlive === "number") {
if (stream.connecting) {
stream.once(CONNECT_EVENT, () => {
stream.setKeepAlive(true, options.keepAlive);
});
} else {
stream.setKeepAlive(true, options.keepAlive);
});
} else {
stream.setKeepAlive(true, options.keepAlive);
}
}
}

if (stream.connecting) {
stream.once(CONNECT_EVENT, eventHandler.connectHandler(_this));

if (options.connectTimeout) {
/*
* Typically, Socket#setTimeout(0) will clear the timer
* set before. However, in some platforms (Electron 3.x~4.x),
* the timer will not be cleared. So we introduce a variable here.
*
* See https://github.com/electron/electron/issues/14915
*/
let connectTimeoutCleared = false;
stream.setTimeout(options.connectTimeout, function () {
if (connectTimeoutCleared) {
return;
}
stream.setTimeout(0);
stream.destroy();

const err = new Error("connect ETIMEDOUT");
// @ts-expect-error
err.errorno = "ETIMEDOUT";
// @ts-expect-error
err.code = "ETIMEDOUT";
// @ts-expect-error
err.syscall = "connect";
eventHandler.errorHandler(_this)(err);
});
stream.once(CONNECT_EVENT, function () {
connectTimeoutCleared = true;
stream.setTimeout(0);
});
if (stream.connecting) {
stream.once(CONNECT_EVENT, eventHandler.connectHandler(_this));

if (options.connectTimeout) {
/*
* Typically, Socket#setTimeout(0) will clear the timer
* set before. However, in some platforms (Electron 3.x~4.x),
* the timer will not be cleared. So we introduce a variable here.
*
* See https://github.com/electron/electron/issues/14915
*/
let connectTimeoutCleared = false;
stream.setTimeout(options.connectTimeout, function () {
if (connectTimeoutCleared) {
return;
}
stream.setTimeout(0);
stream.destroy();

const err = new Error("connect ETIMEDOUT");
// @ts-expect-error
err.errorno = "ETIMEDOUT";
// @ts-expect-error
err.code = "ETIMEDOUT";
// @ts-expect-error
err.syscall = "connect";
eventHandler.errorHandler(_this)(err);
});
stream.once(CONNECT_EVENT, function () {
connectTimeoutCleared = true;
stream.setTimeout(0);
});
}
} else if (stream.destroyed) {
const firstError = _this.connector.firstError;
if (firstError) {
process.nextTick(() => {
eventHandler.errorHandler(_this)(firstError);
});
}
process.nextTick(eventHandler.closeHandler(_this));
} else {
process.nextTick(eventHandler.connectHandler(_this));
}
} else if (stream.destroyed) {
const firstError = _this.connector.firstError;
if (firstError) {
process.nextTick(() => {
eventHandler.errorHandler(_this)(firstError);
});
if (!stream.destroyed) {
stream.once("error", eventHandler.errorHandler(_this));
stream.once("close", eventHandler.closeHandler(_this));
}
process.nextTick(eventHandler.closeHandler(_this));
} else {
process.nextTick(eventHandler.connectHandler(_this));
}
if (!stream.destroyed) {
stream.once("error", eventHandler.errorHandler(_this));
stream.once("close", eventHandler.closeHandler(_this));
}

const connectionReadyHandler = function () {
_this.removeListener("close", connectionCloseHandler);
resolve();
};
var connectionCloseHandler = function () {
_this.removeListener("ready", connectionReadyHandler);
reject(new Error(CONNECTION_CLOSED_ERROR_MSG));
};
_this.once("ready", connectionReadyHandler);
_this.once("close", connectionCloseHandler);
}
);
const connectionReadyHandler = function () {
_this.removeListener("close", connectionCloseHandler);
resolve();
};
var connectionCloseHandler = function () {
_this.removeListener("ready", connectionReadyHandler);
reject(new Error(CONNECTION_CLOSED_ERROR_MSG));
};
_this.once("ready", connectionReadyHandler);
_this.once("close", connectionCloseHandler);
}
);
});
});

return asCallback(promise, callback);
Expand Down Expand Up @@ -862,6 +874,27 @@ class Redis extends Commander implements DataHandledable {
}
}).catch(noop);
}

private resolvePassword(callback: (err: Error | null, password?: string | null) => void) {
const { password } = this.options;
if (!password) {
return callback(null, null);
}
if (typeof password === 'function') {
let p: ReturnType<typeof password> = null;
try {
p = password();
} catch (err) {
return callback(err);
}
if (typeof p === 'string' || !p) {
return callback(null, p as string);
}
return p.then((pw) => callback(null, pw), callback);
}

return callback(null, password);
}
}

interface Redis extends EventEmitter {
Expand Down
2 changes: 1 addition & 1 deletion lib/cluster/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export interface RedisOptions {
port: number;
host: string;
username?: string;
password?: string;
password?: string | (() => Promise<string> | string);
[key: string]: any;
}

Expand Down
24 changes: 18 additions & 6 deletions lib/connectors/SentinelConnector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export interface SentinelConnectionOptions {
role?: "master" | "slave";
tls?: ConnectionOptions;
sentinelUsername?: string;
sentinelPassword?: string;
sentinelPassword?: string | (() => Promise<string> | string);
sentinels?: Array<Partial<SentinelAddress>>;
sentinelRetryStrategy?: (retryAttempts: number) => number | void | null;
sentinelReconnectStrategy?: (retryAttempts: number) => number | void | null;
Expand Down Expand Up @@ -294,15 +294,27 @@ export default class SentinelConnector extends AbstractConnector {
return result;
}

private connectToSentinel(
private async resolveSentinelPassword(): Promise<string | null> {
const { sentinelPassword } = this.options;
if (!sentinelPassword) {
return null;
}
if (typeof sentinelPassword === "function") {
return await sentinelPassword();
}
return sentinelPassword;
}

private async connectToSentinel(
endpoint: Partial<SentinelAddress>,
options?: Partial<RedisOptions>
): RedisClient {
): Promise<RedisClient> {
const resolvedPassword = await this.resolveSentinelPassword();
const redis = new Redis({
port: endpoint.port || 26379,
host: endpoint.host,
username: this.options.sentinelUsername || null,
password: this.options.sentinelPassword || null,
password: resolvedPassword,
family:
endpoint.family ||
// @ts-expect-error
Expand All @@ -324,7 +336,7 @@ export default class SentinelConnector extends AbstractConnector {
private async resolve(
endpoint: Partial<SentinelAddress>
): Promise<TcpNetConnectOpts | null> {
const client = this.connectToSentinel(endpoint);
const client = await this.connectToSentinel(endpoint);

// ignore the errors since resolve* methods will handle them
client.on("error", noop);
Expand Down Expand Up @@ -357,7 +369,7 @@ export default class SentinelConnector extends AbstractConnector {
break;
}

const client = this.connectToSentinel(value, {
const client = await this.connectToSentinel(value, {
lazyConnect: true,
retryStrategy: this.options.sentinelReconnectStrategy,
});
Expand Down
3 changes: 2 additions & 1 deletion lib/redis/RedisOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ export interface CommonRedisOptions extends CommanderOptions {

/**
* If set, client will send AUTH command with the value of this option when connected.
* Can be a string or a function that returns a string or Promise<string>.
*/
password?: string;
password?: string | (() => Promise<string> | string);

/**
* Database index to use.
Expand Down
Loading