Skip to content

Commit

Permalink
refactor: move line reading functionality in-house (#182)
Browse files Browse the repository at this point in the history
* refactor: move line reading functionality in-house

* cleanup

* work

* work

* work

* work

* work
  • Loading branch information
iuioiua authored Dec 17, 2024
1 parent 26494d7 commit d72ab43
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
61 changes: 55 additions & 6 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// deno-lint-ignore-file no-explicit-any
import { chunk } from "@std/collections/chunk";
import { concat } from "@std/bytes/concat";
import { readDelim } from "@std/io/read_delim";
import { writeAll } from "@std/io/write_all";
import type { Writer } from "@std/io/types";
import type { Reader, Writer } from "@std/io/types";

/**
* A Redis client that can be used to send commands to a Redis server.
Expand Down Expand Up @@ -91,6 +90,56 @@ async function writeCommand(
await writeAll(writer, createRequest(command));
}

const DELIM_LPS = new Uint8Array([0, 0]);

/**
* Reads and processes the response line-by-line. Exported for testing.
*
* @private
*/
export async function* readLines(
reader: Reader,
): AsyncIterableIterator<Uint8Array> {
let chunks = new Uint8Array();

// Modified KMP
let inspectIndex = 0;
let matchIndex = 0;
while (true) {
const inspectArr = new Uint8Array(1024);
const result = await reader.read(inspectArr);
if (result === null) {
// Yield last chunk.
yield chunks;
break;
}
chunks = concat([chunks, inspectArr.slice(0, result)]);
let localIndex = 0;
while (inspectIndex < chunks.length) {
if (inspectArr[localIndex] === CRLF[matchIndex]) {
inspectIndex++;
localIndex++;
matchIndex++;
if (matchIndex === DELIM_LPS.length) {
// Full match
const matchEnd = inspectIndex - DELIM_LPS.length;
const readyBytes = chunks.slice(0, matchEnd);
yield readyBytes;
// Reset match, different from KMP.
chunks = chunks.slice(inspectIndex);
inspectIndex = 0;
matchIndex = 0;
}
} else {
if (matchIndex === 0) {
inspectIndex++;
localIndex++;
}
}
}
}
}

function readNReplies(
length: number,
iterator: AsyncIterableIterator<Uint8Array>,
Expand All @@ -100,7 +149,7 @@ function readNReplies(
}

/**
* Reads and processes the response line-by-line. Exported for testing.
* Reads and processes the response reply-by-reply. Exported for testing.
*
* @see {@link https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md}
*
Expand Down Expand Up @@ -184,7 +233,7 @@ async function sendCommand(
raw = false,
): Promise<Reply> {
await writeCommand(redisConn, command);
return readReply(readDelim(redisConn, CRLF), raw);
return readReply(readLines(redisConn), raw);
}

async function pipelineCommands(
Expand All @@ -193,14 +242,14 @@ async function pipelineCommands(
): Promise<Reply[]> {
const bytes = commands.map(createRequest);
await writeAll(redisConn, concat(bytes));
return readNReplies(commands.length, readDelim(redisConn, CRLF));
return readNReplies(commands.length, readLines(redisConn));
}

async function* readReplies(
redisConn: Deno.Conn,
raw = false,
): AsyncIterableIterator<Reply> {
const iterator = readDelim(redisConn, CRLF);
const iterator = readLines(redisConn);
while (true) {
yield await readReply(iterator, raw);
}
Expand Down
24 changes: 12 additions & 12 deletions test.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import { assertEquals, assertRejects } from "@std/assert";
import { StringReader } from "@std/io/string_reader";
import { readDelim } from "@std/io/read_delim";
import { type Command, readReply, RedisClient, type Reply } from "./mod.ts";
import { Buffer } from "@std/io/buffer";
import {
type Command,
readLines,
readReply,
RedisClient,
type Reply,
} from "./mod.ts";

const encoder = new TextEncoder();

const CRLF = "\r\n";

async function readReplyTest(output: string, expected: Reply) {
assertEquals(
await readReply(readDelim(new StringReader(output), encoder.encode(CRLF))),
await readReply(readLines(new Buffer(encoder.encode(output)))),
expected,
);
}

async function readReplyRejectTest(output: string, expected: string) {
await assertRejects(
async () =>
await readReply(
readDelim(new StringReader(output), encoder.encode(CRLF)),
),
function readReplyRejectTest(output: string, expected: string) {
return assertRejects(
() => readReply(readLines(new Buffer(encoder.encode(output)))),
expected,
);
}
Expand Down

0 comments on commit d72ab43

Please sign in to comment.