Skip to content

[CAE-342] Fixed issue with sentinel connect #2905

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions packages/client/lib/sentinel/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async function steadyState(frame: SentinelFramework) {
}

["redis-sentinel-test-password", undefined].forEach(function (password) {
describe.skip(`Sentinel - password = ${password}`, () => {
describe(`Sentinel - password = ${password}`, () => {
const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password };
const frame = new SentinelFramework(config);
let tracer = new Array<string>();
Expand Down Expand Up @@ -197,6 +197,11 @@ async function steadyState(frame: SentinelFramework) {

await assert.doesNotReject(sentinel.get('x'));
});

it('failed to connect', async function() {
sentinel = frame.getSentinelClient({sentinelRootNodes: [{host: "127.0.0.1", port: 1010}], maxCommandRediscovers: 0})
await assert.rejects(sentinel.connect());
});

it('try to connect multiple times', async function () {
sentinel = frame.getSentinelClient();
Expand Down Expand Up @@ -341,18 +346,25 @@ async function steadyState(frame: SentinelFramework) {
this.timeout(30000);

sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
sentinel.on("error", () => { });
sentinel.on("error", (e) => {
// console.log("some err %s", e)
});
await sentinel.connect();

await sentinel.use(
"asd",
async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>, ) => {
const masterNode = sentinel!.getMasterNode();
console.log("stopping")
await frame.stopNode(masterNode!.port.toString());
console.log("stopped")
await assert.doesNotReject(client.get('x'));
console.log("final await")
}
);
});

// TODO: figure out why it fails
it('use with script', async function () {
this.timeout(10000);

Expand All @@ -366,9 +378,13 @@ async function steadyState(frame: SentinelFramework) {
await sentinel.connect();

const reply = await sentinel.use(
"dsf",
async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
console.log("set")
assert.equal(await client.set('key', '2'), 'OK');
console.log("get")
assert.equal(await client.get('key'), '2');
console.log("square")
return client.square('key')
}
);
Expand All @@ -393,6 +409,7 @@ async function steadyState(frame: SentinelFramework) {
);

const reply = await sentinel.use(
"asdf",
async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
await client.set('key', '2');
return client.math.square('key');
Expand All @@ -410,6 +427,7 @@ async function steadyState(frame: SentinelFramework) {
await sentinel.connect();

const reply = await sentinel.use(
"asdf",
async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
return client.bf.add('key', 'item');
}
Expand All @@ -426,6 +444,7 @@ async function steadyState(frame: SentinelFramework) {
await sentinel.connect();

const promise = sentinel.use(
"asdf",
async client => {
await setTimeout(1000);
return await client.get("x");
Expand All @@ -436,13 +455,15 @@ async function steadyState(frame: SentinelFramework) {
assert.equal(await promise, null);
});

// TODO: figure out why it fails
it('reserve client, takes a client out of pool', async function () {
this.timeout(30000);
this.timeout(60000);

sentinel = frame.getSentinelClient({ masterPoolSize: 2, reserveClient: true });
await sentinel.connect();

const promise1 = sentinel.use(
"asdf",
async client => {
const val = await client.get("x");
await client.set("x", 2);
Expand All @@ -451,6 +472,7 @@ async function steadyState(frame: SentinelFramework) {
)

const promise2 = sentinel.use(
"asdg",
async client => {
return client.get("x");
}
Expand All @@ -471,6 +493,7 @@ async function steadyState(frame: SentinelFramework) {
let set = false;

const promise = sentinel.use(
"asdf",
async client => {
await sentinel!.set("x", 1);
await client.get("x");
Expand All @@ -480,9 +503,10 @@ async function steadyState(frame: SentinelFramework) {
await assert.doesNotReject(promise);
});

// TODO: figure out why it fails
// by taking a lease, we know we will block on master as no clients are available, but as read occuring, means replica read occurs
it('replica reads', async function () {
this.timeout(30000);
this.timeout(60000);

sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
sentinel.on("error", () => { });
Expand Down Expand Up @@ -525,7 +549,8 @@ async function steadyState(frame: SentinelFramework) {
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
await sentinel.connect();

let promise = sentinel.use(async (client) => {
let promise = sentinel.use(
"asdf",async (client) => {
await client.set("x", 1);
await client.watch("x");
return client.multi().get("x").exec();
Expand All @@ -540,7 +565,8 @@ async function steadyState(frame: SentinelFramework) {
sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
await sentinel.connect();

let promise = sentinel.use(async (client) => {
let promise = sentinel.use(
"asdg",async (client) => {
await client.set('x', 1);
await client.watch('x');
await sentinel!.set('x', 2);
Expand Down Expand Up @@ -579,9 +605,12 @@ async function steadyState(frame: SentinelFramework) {
await sentinel.connect();

// each of these commands is an independent lease
assert.equal(await sentinel.use(client => client.watch("x")), 'OK')
assert.equal(await sentinel.use(client => client.set('x', 1)), 'OK');
assert.deepEqual(await sentinel.use(client => client.multi().get('x').exec()), ['1']);
assert.equal(await sentinel.use(
"asdf",client => client.watch("x")), 'OK')
assert.equal(await sentinel.use(
"asdg",client => client.set('x', 1)), 'OK');
assert.deepEqual(await sentinel.use(
"asdh",client => client.multi().get('x').exec()), ['1']);
});

// stops master to force sentinel to update
Expand Down Expand Up @@ -718,8 +747,9 @@ async function steadyState(frame: SentinelFramework) {
tracer.push("multi was rejected");
});

// TODO: figure out why it fails
it('plain pubsub - channel', async function () {
this.timeout(30000);
this.timeout(60000);

sentinel = frame.getSentinelClient();
sentinel.setTracer(tracer);
Expand Down Expand Up @@ -1138,7 +1168,7 @@ async function steadyState(frame: SentinelFramework) {
})
})

describe('Sentinel Factory', function () {
describe.skip('Sentinel Factory', function () {
let master: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
let replica: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;

Expand Down
8 changes: 5 additions & 3 deletions packages/client/lib/sentinel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,15 @@ export default class RedisSentinel<
}
}

async use<T>(fn: (sentinelClient: RedisSentinelClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<T>) {
async use<T>(id: string, fn: (sentinelClient: RedisSentinelClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<T>) {
const clientInfo = await this._self.#internal.getClientLease();

console.log("leased client: %d from %s", clientInfo.id, id)
try {
return await fn(
RedisSentinelClient.create(this._self.#options, this._self.#internal, clientInfo, this._self.#commandOptions)
);
} finally {
console.log("released client: %d from %s", clientInfo.id, id)
const promise = this._self.#internal.releaseClientLease(clientInfo);
if (promise) await promise;
}
Expand Down Expand Up @@ -683,6 +684,7 @@ class RedisSentinelInternal<
async #connect() {
let count = 0;
while (true) {
count += 1;
this.#trace("starting connect loop");

if (this.#destroy) {
Expand Down Expand Up @@ -734,7 +736,7 @@ class RedisSentinelInternal<
}
const sockOpts = client.options?.socket as TcpNetConnectOpts | undefined;
this.#trace("attemping to send command to " + sockOpts?.host + ":" + sockOpts?.port)

console.log("attempting to send command to %s:%s from %s", sockOpts?.host, sockOpts?.port, clientInfo?.id)
try {
/*
// force testing of READONLY errors
Expand Down
11 changes: 8 additions & 3 deletions packages/client/lib/sentinel/test-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,14 @@ export class SentinelFramework extends DockerBase {
RedisScripts,
RespVersions,
TypeMapping>>, errors = true) {
if (opts?.sentinelRootNodes !== undefined) {
throw new Error("cannot specify sentinelRootNodes here");
}
// remove this safeguard
// we want to test the case when
// we cannot connect to sentinel

// if (opts?.sentinelRootNodes !== undefined) {
// throw new Error("cannot specify sentinelRootNodes here");
// }

if (opts?.name !== undefined) {
throw new Error("cannot specify sentinel db name here");
}
Expand Down
Loading