Skip to content

Commit

Permalink
refactor!: remove streamed response decoding (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
iuioiua authored Apr 18, 2024
1 parent d849bb6 commit e281c77
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 90 deletions.
71 changes: 4 additions & 67 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ const SET_PREFIX = "~".charCodeAt(0);
const SIMPLE_STRING_PREFIX = "+".charCodeAt(0);
const VERBATIM_STRING_PREFIX = "=".charCodeAt(0);

const STREAMED_REPLY_START_DELIMITER = "?".charCodeAt(0);
const STREAMED_STRING_END_DELIMITER = ";0";
const STREAMED_AGGREGATE_END_DELIMITER = ".";

/**
* Transforms a command, which is an array of arguments, into an RESP request.
*
Expand Down Expand Up @@ -94,10 +90,6 @@ function removePrefix(line: Uint8Array): string {
return decoder.decode(line.slice(1));
}

function isSteamedReply(line: Uint8Array): boolean {
return line[1] === STREAMED_REPLY_START_DELIMITER;
}

function toObject(array: any[]): Record<string, any> {
return Object.fromEntries(chunk(array, 2));
}
Expand All @@ -114,22 +106,6 @@ async function readNReplies(
return replies;
}

async function readStreamedReply(
delimiter: string,
iterator: AsyncIterableIterator<Uint8Array>,
raw = false,
): Promise<Reply[]> {
const replies: Reply[] = [];
while (true) {
const reply = await readReply(iterator, raw);
if (reply === delimiter) {
break;
}
replies.push(reply);
}
return replies;
}

async function readArray(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
Expand Down Expand Up @@ -216,37 +192,6 @@ function readSimpleString(line: Uint8Array): string {
return removePrefix(line);
}

async function readStreamedArray(
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<Reply[]> {
return await readStreamedReply(STREAMED_AGGREGATE_END_DELIMITER, iterator);
}

async function readStreamedMap(
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<Record<string, any>> {
const array = await readStreamedReply(
STREAMED_AGGREGATE_END_DELIMITER,
iterator,
);
return toObject(array);
}

async function readStreamedSet(
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<Set<Reply>> {
return new Set(await readStreamedArray(iterator));
}

async function readStreamedString(
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<string> {
return (await readStreamedReply(STREAMED_STRING_END_DELIMITER, iterator))
/** Remove byte counts */
.filter((line) => !(line as string).startsWith(";"))
.join("");
}

/**
* Reads and processes the response line-by-line. Exported for testing.
*
Expand All @@ -265,9 +210,7 @@ export async function readReply(
switch (value[0]) {
case ARRAY_PREFIX:
case PUSH_PREFIX:
return isSteamedReply(value)
? await readStreamedArray(iterator)
: await readArray(value, iterator);
return await readArray(value, iterator);
case ATTRIBUTE_PREFIX:
return await readAttribute(value, iterator);
case BIG_NUMBER_PREFIX:
Expand All @@ -278,24 +221,18 @@ export async function readReply(
return readBoolean(value);
case BULK_STRING_PREFIX:
case VERBATIM_STRING_PREFIX:
return isSteamedReply(value)
? await readStreamedString(iterator)
: await readBulkOrVerbatimString(value, iterator, raw);
return await readBulkOrVerbatimString(value, iterator, raw);
case DOUBLE_PREFIX:
case INTEGER_PREFIX:
return readNumberOrDouble(value);
case ERROR_PREFIX:
return readError(value);
case MAP_PREFIX:
return isSteamedReply(value)
? await readStreamedMap(iterator)
: await readMap(value, iterator);
return await readMap(value, iterator);
case NULL_PREFIX:
return null;
case SET_PREFIX:
return isSteamedReply(value)
? await readStreamedSet(iterator)
: await readSet(value, iterator);
return await readSet(value, iterator);
case SIMPLE_STRING_PREFIX:
return readSimpleString(value);
/** No prefix */
Expand Down
23 changes: 0 additions & 23 deletions test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,29 +137,6 @@ Deno.test("readReply() - set", async () => {
Deno.test("readReply() - simple string", async () =>
await readReplyTest("+OK\r\n", "OK"));

Deno.test("readReply() - streamed string", async () => {
await readReplyTest(
"$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;1\r\nd\r\n;0\r\n",
"Hello word",
);
});

/** @todo test more complex case */
Deno.test("readReply() - streamed array", async () => {
await readReplyTest("*?\r\n:1\r\n:2\r\n:3\r\n.\r\n", [1, 2, 3]);
});

Deno.test("readReply() - streamed set", async () => {
await readReplyTest(
"~?\r\n+a\r\n:1\r\n+b\r\n:2\r\n.\r\n",
new Set(["a", 1, "b", 2]),
);
});

Deno.test("readReply() - streamed map", async () => {
await readReplyTest("%?\r\n+a\r\n:1\r\n+b\r\n:2\r\n.\r\n", { a: 1, b: 2 });
});

Deno.test("readReply() - verbatim string", async () => {
await readReplyTest("=15\r\ntxt:Some string\r\n", "txt:Some string");
});
Expand Down

0 comments on commit e281c77

Please sign in to comment.