Skip to content

interceptor for pre and post work around commands #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions lib/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference types="node" />
import { Redis as _Redis, Cluster } from "ioredis";
import IoRedis, { Redis as _Redis, Cluster } from "ioredis";
import EventEmitter from "events";
import { Registry, Counter, Histogram } from "prom-client";
interface RedisConfig {
/** provide host ip/url, default - localhost */
host?: string;
Expand Down Expand Up @@ -45,15 +46,43 @@ declare class Redis {
emitter: EventEmitter;
config: RedisConfig;
client: Cluster | _Redis;
commandTimeout?: number;
metrics?: {
register: Registry;
labels: {
[key: string]: string;
};
};
trackers?: {
commands?: Counter;
errors?: Counter;
latencies?: Histogram;
};
/**
* @param {string} name - unique name to this service
* @param {EventEmitter} emitter
* @param {RedisConfig} config - configuration object of service
* @param {Registry} metrics - prometheus client
*/
constructor(name: string, emitter: EventEmitter, config: RedisConfig);
constructor(name: string, emitter: EventEmitter, config: RedisConfig, metrics?: {
register: Registry;
labels: {
[key: string]: string;
};
});
log(message: string, data: unknown): void;
success(message: string, data: unknown): void;
error(err: Error, data: unknown): void;
makeError(message: string, data: unknown): Error;
trackCommand(command: string): void;
trackErrors(command: string, errorMessage: string): void;
trackLatencies(command: string, startTime: number): void;
createTimeoutPromise(ms: number, command: string): {
timeoutPromise: Promise<unknown>;
clear: () => void;
};
executeCommand(target: any, prop: any, args: any): Promise<unknown>;
makeProxy(client: Cluster | _Redis): Cluster | IoRedis;
/**
* Connect to redis server with the config
*
Expand Down
138 changes: 136 additions & 2 deletions lib/index.js

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
},
"homepage": "https://github.com/akshendra/redis-wrapper#readme",
"dependencies": {
"ioredis": "5.3.2"
"@ioredis/commands": "^1.2.0",
"ioredis": "5.3.2",
"prom-client": "^15.0.0"
},
"devDependencies": {
"@types/node": "^16.11.7",
Expand All @@ -36,4 +38,4 @@
"eslint-plugin-import": "^2.23.4",
"typescript": "^4.4.4"
}
}
}
180 changes: 179 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import IoRedis, {
RedisOptions,
} from "ioredis";
import EventEmitter from "events";
import { exists as isCommand } from "@ioredis/commands";
import { Registry, Counter, Histogram } from "prom-client";
import { performance } from "perf_hooks";

function retryStrategy(times: number): number {
if (times > 1000) {
Expand Down Expand Up @@ -67,15 +70,66 @@ class Redis {
emitter: EventEmitter;
config: RedisConfig;
client: Cluster | _Redis;
commandTimeout?: number;
metrics?: {
register: Registry;
labels: { [key: string]: string };
};
trackers?: { commands?: Counter; errors?: Counter; latencies?: Histogram };

/**
* @param {string} name - unique name to this service
* @param {EventEmitter} emitter
* @param {RedisConfig} config - configuration object of service
* @param {Registry} metrics - prometheus client
*/
constructor(name: string, emitter: EventEmitter, config: RedisConfig) {
constructor(
name: string,
emitter: EventEmitter,
config: RedisConfig,
metrics?: {
register: Registry;
labels: { [key: string]: string };
}
) {
this.name = name;
this.emitter = emitter;
this.commandTimeout = config.commandTimeout;
this.metrics = metrics;

if (this.metrics) {
// register counters
this.trackers = {};

// create counter for tracking the number of times redis commands are called
this.trackers.commands = new Counter({
name: `${this.name.replaceAll("-", "_")}:commands`,
help: "keep track of all redis commands",
labelNames: [...Object.keys(this.metrics.labels), "command"],
registers: [this.metrics.register],
});

// create counter for tracking the number of times redis commands have failed
this.trackers.errors = new Counter({
name: `${this.name.replaceAll("-", "_")}:errors`,
help: "keep track of all redis command errors",
labelNames: [
...Object.keys(this.metrics.labels),
"command",
"errorMessage",
],
registers: [this.metrics.register],
});

// create histogram for tracking latencies of redis commands
this.trackers.latencies = new Histogram({
name: `${this.name.replaceAll("-", "_")}:latencies`,
help: "keep track of redis command latencies",
labelNames: [...Object.keys(this.metrics.labels), "command"],
registers: [this.metrics.register],
});
}

this.config = Object.assign(
{
host: "localhost",
Expand Down Expand Up @@ -131,6 +185,128 @@ class Redis {
});
}

makeError(message: string, data: unknown): Error {
const error = new Error(message);
this.error(error, data);
return error;
}

trackCommand(command: string): void {
if (this.trackers?.commands) {
this.trackers.commands.inc(
{
...this.metrics.labels,
command,
},
1
);
}
}

trackErrors(command: string, errorMessage: string): void {
if (this.trackers?.errors) {
this.trackers.errors.inc(
{
...this.metrics.labels,
command,
errorMessage,
},
1
);
}
}

trackLatencies(command: string, startTime: number): void {
if (this.trackers?.latencies) {
const endTime = performance.now();
this.trackers.latencies.observe(
{
...this.metrics.labels,
command,
},
endTime - startTime
);
}
}

createTimeoutPromise(
ms: number,
command: string
): {
timeoutPromise: Promise<unknown>;
clear: () => void;
} {
let timeoutId: NodeJS.Timeout;
const timeoutPromise = new Promise((_, reject) => {
timeoutId = setTimeout(() => {
reject(
this.makeError("redis.COMMAND_TIMEOUT", {
command,
timeout: ms,
})
);
}, ms);
});
return {
timeoutPromise,
clear: () => {
clearTimeout(timeoutId);
},
};
}

async executeCommand(target, prop, args): Promise<unknown> {
const startTime = performance.now();
try {
this.trackCommand(String(prop));
const result = await target[prop](...args);
this.trackLatencies(String(prop), startTime);
return result;
} catch (err) {
this.trackLatencies(String(prop), startTime);
this.trackErrors(String(prop), err.message);
throw this.makeError("redis.COMMAND_ERROR", {
command: prop,
args,
error: err,
});
}
}

makeProxy(client: Cluster | _Redis) {
return new Proxy(client, {
get: (target, prop) => {
// check if a command or not
if (!isCommand(String(prop))) {
return target[prop];
}

// check if client in ready state
if (this.client.status !== "ready") {
throw this.makeError("redis.NOT_READY", {
command: prop,
});
}

return (...args: unknown[]): Promise<unknown> => {
// If timeout is set, apply Promise.race
if (this.client.isCluster && this.commandTimeout) {
const { timeoutPromise, clear } = this.createTimeoutPromise(
this.commandTimeout,
String(prop)
);
return Promise.race([
this.executeCommand(target, prop, args),
timeoutPromise,
]).finally(clear);
} else {
return this.executeCommand(target, prop, args);
}
};
},
});
}

/**
* Connect to redis server with the config
*
Expand Down Expand Up @@ -236,6 +412,8 @@ class Redis {

this.log(`Connecting in ${infoObj.mode} mode`, infoObj);

client = this.makeProxy(client);

// common events
client.on("connect", () => {
this.success(`Successfully connected in ${infoObj.mode} mode`, null);
Expand Down
Loading