From 26494d7c67234db87d382770b166cf104e301962 Mon Sep 17 00:00:00 2001 From: Asher Gomez Date: Tue, 17 Dec 2024 10:30:19 +1100 Subject: [PATCH] refactor: cleanup `readReply()` logic (#181) * refactor: cleanup `readReply()` logic * simplify `readNReplies()` * fix `RedisClient.prototype.writeCommand()` return type * update `deno.json` --- deno.json | 3 +- mod.ts | 242 +++++++++++++++++------------------------------------- 2 files changed, 76 insertions(+), 169 deletions(-) diff --git a/deno.json b/deno.json index e9a81de..a1860af 100644 --- a/deno.json +++ b/deno.json @@ -7,8 +7,7 @@ "@std/bytes": "jsr:@std/bytes@^0.220.1", "@std/collections": "jsr:@std/collections@^0.220.1", "@std/fmt": "jsr:@std/fmt@^0.220.1", - "@std/io": "jsr:@std/io@^0.220.1", - "jsr:@iuioiua/r2d2/": "./" + "@std/io": "jsr:@std/io@^0.220.1" }, "tasks": { "redis:start": "redis-server --save \"\" --appendonly no --daemonize yes", diff --git a/mod.ts b/mod.ts index 3a727c3..9e571a9 100644 --- a/mod.ts +++ b/mod.ts @@ -38,12 +38,12 @@ export type Reply = const encoder = new TextEncoder(); const decoder = new TextDecoder(); -const CRLF = "\r\n"; -const CRLF_RAW = encoder.encode(CRLF); +const CRLF_STRING = "\r\n"; const ARRAY_PREFIX_STRING = "*"; const BULK_STRING_PREFIX_STRING = "$"; +const CRLF = encoder.encode(CRLF_STRING); const ARRAY_PREFIX = ARRAY_PREFIX_STRING.charCodeAt(0); const ATTRIBUTE_PREFIX = "|".charCodeAt(0); const BIG_NUMBER_PREFIX = "(".charCodeAt(0); @@ -66,16 +66,20 @@ const VERBATIM_STRING_PREFIX = "=".charCodeAt(0); * @see {@link https://redis.io/docs/reference/protocol-spec/#send-commands-to-a-redis-server} */ function createRequest(command: Command): Uint8Array { - const lines = [encoder.encode(ARRAY_PREFIX_STRING + command.length + CRLF)]; + const lines = [ + encoder.encode(ARRAY_PREFIX_STRING + command.length + CRLF_STRING), + ]; for (const arg of command) { const bytes = arg instanceof Uint8Array ? arg : encoder.encode(arg.toString()); lines.push( - encoder.encode(BULK_STRING_PREFIX_STRING + bytes.byteLength + CRLF), + encoder.encode( + BULK_STRING_PREFIX_STRING + bytes.byteLength + CRLF_STRING, + ), + bytes, + CRLF, ); - lines.push(bytes); - lines.push(CRLF_RAW); } return concat(lines); } @@ -87,110 +91,12 @@ async function writeCommand( await writeAll(writer, createRequest(command)); } -function removePrefix(line: Uint8Array): string { - return decoder.decode(line.slice(1)); -} - -function toObject(array: any[]): Record { - return Object.fromEntries(chunk(array, 2)); -} - -async function readNReplies( +function readNReplies( length: number, iterator: AsyncIterableIterator, raw = false, ): Promise { - const replies: Reply[] = []; - for (let i = 0; i < length; i++) { - replies.push(await readReply(iterator, raw)); - } - return replies; -} - -async function readArray( - line: Uint8Array, - iterator: AsyncIterableIterator, -): Promise { - const length = readNumberOrDouble(line); - return length === -1 ? null : await readNReplies(length, iterator); -} - -/** - * Read but don't return attribute data. - * - * @todo include attribute data somehow - */ -async function readAttribute( - line: Uint8Array, - iterator: AsyncIterableIterator, - raw = false, -): Promise { - await readMap(line, iterator); - return await readReply(iterator, raw); -} - -function readBigNumber(line: Uint8Array): bigint { - return BigInt(removePrefix(line)); -} - -async function readBlobError( - iterator: AsyncIterableIterator, -): Promise { - /** Skip to reading the next line, which is a string */ - const { value } = await iterator.next(); - return await Promise.reject(decoder.decode(value)); -} - -function readBoolean(line: Uint8Array): boolean { - return removePrefix(line) === "t"; -} - -async function readBulkOrVerbatimString( - line: Uint8Array, - iterator: AsyncIterableIterator, - raw = false, -): Promise { - if (readNumberOrDouble(line) === -1) { - return null; - } - const { value } = await iterator.next(); - return raw ? value : decoder.decode(value); -} - -async function readError(line: Uint8Array): Promise { - return await Promise.reject(removePrefix(line)); -} - -async function readMap( - line: Uint8Array, - iterator: AsyncIterableIterator, -): Promise> { - const length = readNumberOrDouble(line) * 2; - const array = await readNReplies(length, iterator); - return toObject(array); -} - -function readNumberOrDouble(line: Uint8Array): number { - const number = removePrefix(line); - switch (number) { - case "inf": - return Infinity; - case "-inf": - return -Infinity; - default: - return Number(number); - } -} - -async function readSet( - line: Uint8Array, - iterator: AsyncIterableIterator, -): Promise> { - return new Set(await readArray(line, iterator)); -} - -function readSimpleString(line: Uint8Array): string { - return removePrefix(line); + return Array.fromAsync({ length }, () => readReply(iterator, raw)); } /** @@ -206,36 +112,66 @@ export async function readReply( ): Promise { const { value } = await iterator.next(); if (value.length === 0) { - return await Promise.reject(new TypeError("No reply received")); + return Promise.reject(new TypeError("No reply received")); } + const line = decoder.decode(value.slice(1)); switch (value[0]) { case ARRAY_PREFIX: - case PUSH_PREFIX: - return await readArray(value, iterator); - case ATTRIBUTE_PREFIX: - return await readAttribute(value, iterator); + case PUSH_PREFIX: { + const length = Number(line); + return length === -1 ? null : await readNReplies(length, iterator); + } + case ATTRIBUTE_PREFIX: { + /** + * Read but don't return attribute data. + * + * @todo include attribute data somehow + */ + const length = Number(line) * 2; + await readNReplies(length, iterator); + return readReply(iterator, raw); + } case BIG_NUMBER_PREFIX: - return readBigNumber(value); - case BLOB_ERROR_PREFIX: - return readBlobError(iterator); + return BigInt(line); + case BLOB_ERROR_PREFIX: { + /** Skip to reading the next line, which is a string */ + const { value } = await iterator.next(); + return Promise.reject(decoder.decode(value)); + } case BOOLEAN_PREFIX: - return readBoolean(value); + return line === "t"; case BULK_STRING_PREFIX: - case VERBATIM_STRING_PREFIX: - return await readBulkOrVerbatimString(value, iterator, raw); + case VERBATIM_STRING_PREFIX: { + if (Number(line) === -1) { + return null; + } + const { value } = await iterator.next(); + return raw ? value : decoder.decode(value); + } case DOUBLE_PREFIX: - case INTEGER_PREFIX: - return readNumberOrDouble(value); + case INTEGER_PREFIX: { + switch (line) { + case "inf": + return Infinity; + case "-inf": + return -Infinity; + default: + return Number(line); + } + } case ERROR_PREFIX: - return readError(value); - case MAP_PREFIX: - return await readMap(value, iterator); + return Promise.reject(line); + case MAP_PREFIX: { + const length = Number(line) * 2; + const array = await readNReplies(length, iterator); + return Object.fromEntries(chunk(array, 2)); + } case NULL_PREFIX: return null; case SET_PREFIX: - return await readSet(value, iterator); + return new Set(await readNReplies(Number(line), iterator, raw)); case SIMPLE_STRING_PREFIX: - return readSimpleString(value); + return line; /** No prefix */ default: return decoder.decode(value); @@ -248,7 +184,7 @@ async function sendCommand( raw = false, ): Promise { await writeCommand(redisConn, command); - return await readReply(readDelim(redisConn, CRLF_RAW), raw); + return readReply(readDelim(redisConn, CRLF), raw); } async function pipelineCommands( @@ -257,53 +193,31 @@ async function pipelineCommands( ): Promise { const bytes = commands.map(createRequest); await writeAll(redisConn, concat(bytes)); - return readNReplies(commands.length, readDelim(redisConn, CRLF_RAW)); + return readNReplies(commands.length, readDelim(redisConn, CRLF)); } async function* readReplies( redisConn: Deno.Conn, raw = false, ): AsyncIterableIterator { - const iterator = readDelim(redisConn, CRLF_RAW); + const iterator = readDelim(redisConn, CRLF); while (true) { yield await readReply(iterator, raw); } } -class AsyncQueue { - #queue: Promise = Promise.resolve(); - - async enqueue(task: () => Promise): Promise { - this.#queue = this.#queue.then(task); - return await this.#queue; - } -} - -/** - * A Redis client that can be used to send commands to a Redis server. - * - * @example - * ```ts ignore - * import { RedisClient } from "jsr:@iuioiua/r2d2"; - * - * const redisConn = await Deno.connect({ port: 6379 }); - * const redisClient = new RedisClient(redisConn); - * - * // Returns "OK" - * await redisClient.sendCommand(["SET", "hello", "world"]); - * - * // Returns "world" - * await redisClient.sendCommand(["GET", "hello"]); - * ``` - */ export class RedisClient { #conn: Deno.TcpConn | Deno.TlsConn; - #queue: AsyncQueue; + #queue: Promise = Promise.resolve(); /** Constructs a new instance. */ constructor(conn: Deno.TcpConn | Deno.TlsConn) { this.#conn = conn; - this.#queue = new AsyncQueue(); + } + + #enqueue(task: () => Promise): Promise { + this.#queue = this.#queue.then(task); + return this.#queue; } /** @@ -323,10 +237,8 @@ export class RedisClient { * await redisClient.sendCommand(["GET", "hello"]); * ``` */ - async sendCommand(command: Command, raw = false): Promise { - return await this.#queue.enqueue( - async () => await sendCommand(this.#conn, command, raw), - ); + sendCommand(command: Command, raw = false): Promise { + return this.#enqueue(() => sendCommand(this.#conn, command, raw)); } /** @@ -342,10 +254,8 @@ export class RedisClient { * await redisClient.writeCommand(["SHUTDOWN"]); * ``` */ - async writeCommand(command: Command) { - await this.#queue.enqueue( - async () => await writeCommand(this.#conn, command), - ); + writeCommand(command: Command): Promise { + return this.#enqueue(() => writeCommand(this.#conn, command)); } /** @@ -389,9 +299,7 @@ export class RedisClient { * ]); * ``` */ - async pipelineCommands(commands: Command[]): Promise { - return await this.#queue.enqueue( - async () => await pipelineCommands(this.#conn, commands), - ); + pipelineCommands(commands: Command[]): Promise { + return this.#enqueue(() => pipelineCommands(this.#conn, commands)); } }