Skip to content

Commit

Permalink
refactor: cleanup readReply() logic (#181)
Browse files Browse the repository at this point in the history
* refactor: cleanup `readReply()` logic

* simplify `readNReplies()`

* fix `RedisClient.prototype.writeCommand()` return type

* update `deno.json`
  • Loading branch information
iuioiua authored Dec 16, 2024
1 parent a415cb7 commit 26494d7
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 169 deletions.
3 changes: 1 addition & 2 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
"@std/bytes": "jsr:@std/bytes@^0.220.1",
"@std/collections": "jsr:@std/collections@^0.220.1",
"@std/fmt": "jsr:@std/fmt@^0.220.1",
"@std/io": "jsr:@std/io@^0.220.1",
"jsr:@iuioiua/r2d2/": "./"
"@std/io": "jsr:@std/io@^0.220.1"
},
"tasks": {
"redis:start": "redis-server --save \"\" --appendonly no --daemonize yes",
Expand Down
242 changes: 75 additions & 167 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ export type Reply =

const encoder = new TextEncoder();
const decoder = new TextDecoder();
const CRLF = "\r\n";
const CRLF_RAW = encoder.encode(CRLF);

const CRLF_STRING = "\r\n";
const ARRAY_PREFIX_STRING = "*";
const BULK_STRING_PREFIX_STRING = "$";

const CRLF = encoder.encode(CRLF_STRING);
const ARRAY_PREFIX = ARRAY_PREFIX_STRING.charCodeAt(0);
const ATTRIBUTE_PREFIX = "|".charCodeAt(0);
const BIG_NUMBER_PREFIX = "(".charCodeAt(0);
Expand All @@ -66,16 +66,20 @@ const VERBATIM_STRING_PREFIX = "=".charCodeAt(0);
* @see {@link https://redis.io/docs/reference/protocol-spec/#send-commands-to-a-redis-server}
*/
function createRequest(command: Command): Uint8Array {
const lines = [encoder.encode(ARRAY_PREFIX_STRING + command.length + CRLF)];
const lines = [
encoder.encode(ARRAY_PREFIX_STRING + command.length + CRLF_STRING),
];
for (const arg of command) {
const bytes = arg instanceof Uint8Array
? arg
: encoder.encode(arg.toString());
lines.push(
encoder.encode(BULK_STRING_PREFIX_STRING + bytes.byteLength + CRLF),
encoder.encode(
BULK_STRING_PREFIX_STRING + bytes.byteLength + CRLF_STRING,
),
bytes,
CRLF,
);
lines.push(bytes);
lines.push(CRLF_RAW);
}
return concat(lines);
}
Expand All @@ -87,110 +91,12 @@ async function writeCommand(
await writeAll(writer, createRequest(command));
}

function removePrefix(line: Uint8Array): string {
return decoder.decode(line.slice(1));
}

function toObject(array: any[]): Record<string, any> {
return Object.fromEntries(chunk(array, 2));
}

async function readNReplies(
function readNReplies(
length: number,
iterator: AsyncIterableIterator<Uint8Array>,
raw = false,
): Promise<Reply[]> {
const replies: Reply[] = [];
for (let i = 0; i < length; i++) {
replies.push(await readReply(iterator, raw));
}
return replies;
}

async function readArray(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<null | Reply[]> {
const length = readNumberOrDouble(line);
return length === -1 ? null : await readNReplies(length, iterator);
}

/**
* Read but don't return attribute data.
*
* @todo include attribute data somehow
*/
async function readAttribute(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
raw = false,
): Promise<null | Reply> {
await readMap(line, iterator);
return await readReply(iterator, raw);
}

function readBigNumber(line: Uint8Array): bigint {
return BigInt(removePrefix(line));
}

async function readBlobError(
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<never> {
/** Skip to reading the next line, which is a string */
const { value } = await iterator.next();
return await Promise.reject(decoder.decode(value));
}

function readBoolean(line: Uint8Array): boolean {
return removePrefix(line) === "t";
}

async function readBulkOrVerbatimString(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
raw = false,
): Promise<string | null> {
if (readNumberOrDouble(line) === -1) {
return null;
}
const { value } = await iterator.next();
return raw ? value : decoder.decode(value);
}

async function readError(line: Uint8Array): Promise<never> {
return await Promise.reject(removePrefix(line));
}

async function readMap(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<Record<string, any>> {
const length = readNumberOrDouble(line) * 2;
const array = await readNReplies(length, iterator);
return toObject(array);
}

function readNumberOrDouble(line: Uint8Array): number {
const number = removePrefix(line);
switch (number) {
case "inf":
return Infinity;
case "-inf":
return -Infinity;
default:
return Number(number);
}
}

async function readSet(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<Set<Reply>> {
return new Set(await readArray(line, iterator));
}

function readSimpleString(line: Uint8Array): string {
return removePrefix(line);
return Array.fromAsync({ length }, () => readReply(iterator, raw));
}

/**
Expand All @@ -206,36 +112,66 @@ export async function readReply(
): Promise<Reply> {
const { value } = await iterator.next();
if (value.length === 0) {
return await Promise.reject(new TypeError("No reply received"));
return Promise.reject(new TypeError("No reply received"));
}
const line = decoder.decode(value.slice(1));
switch (value[0]) {
case ARRAY_PREFIX:
case PUSH_PREFIX:
return await readArray(value, iterator);
case ATTRIBUTE_PREFIX:
return await readAttribute(value, iterator);
case PUSH_PREFIX: {
const length = Number(line);
return length === -1 ? null : await readNReplies(length, iterator);
}
case ATTRIBUTE_PREFIX: {
/**
* Read but don't return attribute data.
*
* @todo include attribute data somehow
*/
const length = Number(line) * 2;
await readNReplies(length, iterator);
return readReply(iterator, raw);
}
case BIG_NUMBER_PREFIX:
return readBigNumber(value);
case BLOB_ERROR_PREFIX:
return readBlobError(iterator);
return BigInt(line);
case BLOB_ERROR_PREFIX: {
/** Skip to reading the next line, which is a string */
const { value } = await iterator.next();
return Promise.reject(decoder.decode(value));
}
case BOOLEAN_PREFIX:
return readBoolean(value);
return line === "t";
case BULK_STRING_PREFIX:
case VERBATIM_STRING_PREFIX:
return await readBulkOrVerbatimString(value, iterator, raw);
case VERBATIM_STRING_PREFIX: {
if (Number(line) === -1) {
return null;
}
const { value } = await iterator.next();
return raw ? value : decoder.decode(value);
}
case DOUBLE_PREFIX:
case INTEGER_PREFIX:
return readNumberOrDouble(value);
case INTEGER_PREFIX: {
switch (line) {
case "inf":
return Infinity;
case "-inf":
return -Infinity;
default:
return Number(line);
}
}
case ERROR_PREFIX:
return readError(value);
case MAP_PREFIX:
return await readMap(value, iterator);
return Promise.reject(line);
case MAP_PREFIX: {
const length = Number(line) * 2;
const array = await readNReplies(length, iterator);
return Object.fromEntries(chunk(array, 2));
}
case NULL_PREFIX:
return null;
case SET_PREFIX:
return await readSet(value, iterator);
return new Set(await readNReplies(Number(line), iterator, raw));
case SIMPLE_STRING_PREFIX:
return readSimpleString(value);
return line;
/** No prefix */
default:
return decoder.decode(value);
Expand All @@ -248,7 +184,7 @@ async function sendCommand(
raw = false,
): Promise<Reply> {
await writeCommand(redisConn, command);
return await readReply(readDelim(redisConn, CRLF_RAW), raw);
return readReply(readDelim(redisConn, CRLF), raw);
}

async function pipelineCommands(
Expand All @@ -257,53 +193,31 @@ async function pipelineCommands(
): Promise<Reply[]> {
const bytes = commands.map(createRequest);
await writeAll(redisConn, concat(bytes));
return readNReplies(commands.length, readDelim(redisConn, CRLF_RAW));
return readNReplies(commands.length, readDelim(redisConn, CRLF));
}

async function* readReplies(
redisConn: Deno.Conn,
raw = false,
): AsyncIterableIterator<Reply> {
const iterator = readDelim(redisConn, CRLF_RAW);
const iterator = readDelim(redisConn, CRLF);
while (true) {
yield await readReply(iterator, raw);
}
}

class AsyncQueue {
#queue: Promise<any> = Promise.resolve();

async enqueue<T>(task: () => Promise<T>): Promise<T> {
this.#queue = this.#queue.then(task);
return await this.#queue;
}
}

/**
* A Redis client that can be used to send commands to a Redis server.
*
* @example
* ```ts ignore
* import { RedisClient } from "jsr:@iuioiua/r2d2";
*
* const redisConn = await Deno.connect({ port: 6379 });
* const redisClient = new RedisClient(redisConn);
*
* // Returns "OK"
* await redisClient.sendCommand(["SET", "hello", "world"]);
*
* // Returns "world"
* await redisClient.sendCommand(["GET", "hello"]);
* ```
*/
export class RedisClient {
#conn: Deno.TcpConn | Deno.TlsConn;
#queue: AsyncQueue;
#queue: Promise<any> = Promise.resolve();

/** Constructs a new instance. */
constructor(conn: Deno.TcpConn | Deno.TlsConn) {
this.#conn = conn;
this.#queue = new AsyncQueue();
}

#enqueue<T>(task: () => Promise<T>): Promise<T> {
this.#queue = this.#queue.then(task);
return this.#queue;
}

/**
Expand All @@ -323,10 +237,8 @@ export class RedisClient {
* await redisClient.sendCommand(["GET", "hello"]);
* ```
*/
async sendCommand(command: Command, raw = false): Promise<Reply> {
return await this.#queue.enqueue(
async () => await sendCommand(this.#conn, command, raw),
);
sendCommand(command: Command, raw = false): Promise<Reply> {
return this.#enqueue(() => sendCommand(this.#conn, command, raw));
}

/**
Expand All @@ -342,10 +254,8 @@ export class RedisClient {
* await redisClient.writeCommand(["SHUTDOWN"]);
* ```
*/
async writeCommand(command: Command) {
await this.#queue.enqueue(
async () => await writeCommand(this.#conn, command),
);
writeCommand(command: Command): Promise<void> {
return this.#enqueue(() => writeCommand(this.#conn, command));
}

/**
Expand Down Expand Up @@ -389,9 +299,7 @@ export class RedisClient {
* ]);
* ```
*/
async pipelineCommands(commands: Command[]): Promise<Reply[]> {
return await this.#queue.enqueue(
async () => await pipelineCommands(this.#conn, commands),
);
pipelineCommands(commands: Command[]): Promise<Reply[]> {
return this.#enqueue(() => pipelineCommands(this.#conn, commands));
}
}

0 comments on commit 26494d7

Please sign in to comment.