Skip to content

Commit

Permalink
fix: reintroduce deno streams
Browse files Browse the repository at this point in the history
  • Loading branch information
greg6775 committed Dec 17, 2023
1 parent 1758c7e commit 28120b7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
2 changes: 2 additions & 0 deletions deps.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
export * from "https://deno.land/x/[email protected]/mod.js";
export { writeAll } from "https://deno.land/[email protected]/streams/write_all.ts";
export { crypto } from "https://deno.land/[email protected]/crypto/mod.ts";
export { BufReader } from "https://deno.land/[email protected]/io/mod.ts";
export * as b64 from "https://deno.land/[email protected]/encoding/base64.ts";
export * as hex from "https://deno.land/[email protected]/encoding/hex.ts";
22 changes: 11 additions & 11 deletions src/protocol/protocol.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Document } from "../../deps.ts";
import { BufReader, Document, writeAll } from "../../deps.ts";
import {
MongoDriverError,
MongoErrorInfo,
Expand All @@ -8,7 +8,7 @@ import { handshake } from "./handshake.ts";
import { parseHeader } from "./header.ts";
import { deserializeMessage, Message, serializeMessage } from "./message.ts";

type Socket = ReadableStream & WritableStream;
type Socket = Deno.Reader & Deno.Writer;
interface CommandTask {
requestId: number;
db: string;
Expand All @@ -27,12 +27,12 @@ export class WireProtocol {
// deno-lint-ignore no-explicit-any
reject: (reason?: any) => void;
}> = new Map();
#reader: ReadableStreamBYOBReader;
#reader: BufReader;
#commandQueue: CommandTask[] = [];

constructor(socket: Socket) {
this.#socket = socket;
this.#reader = new ReadableStreamBYOBReader(this.#socket);
this.#reader = new BufReader(this.#socket);
}

async connect() {
Expand Down Expand Up @@ -97,7 +97,7 @@ export class WireProtocol {
],
});

await ReadableStream.from(buffer).pipeTo(this.#socket);
await writeAll(this.#socket, buffer);
}
this.#isPendingRequest = false;
}
Expand All @@ -106,18 +106,18 @@ export class WireProtocol {
if (this.#isPendingResponse) return;
this.#isPendingResponse = true;
while (this.#pendingResponses.size > 0) {
const headerBuffer = await this.#reader.read(new Uint8Array(16));
if (!headerBuffer.value) {
const headerBuffer = await this.#reader.readFull(new Uint8Array(16));
if (!headerBuffer) {
throw new MongoDriverError("Invalid response header");
}
const header = parseHeader(headerBuffer.value);
const bodyBuffer = await this.#reader.read(
const header = parseHeader(headerBuffer);
const bodyBuffer = await this.#reader.readFull(
new Uint8Array(header.messageLength - 16),
);
if (!bodyBuffer.value) {
if (!bodyBuffer) {
throw new MongoDriverError("Invalid response body");
}
const reply = deserializeMessage(header, bodyBuffer.value);
const reply = deserializeMessage(header, bodyBuffer);
const pendingMessage = this.#pendingResponses.get(header.responseTo);
this.#pendingResponses.delete(header.responseTo);
pendingMessage?.resolve(reply);
Expand Down

0 comments on commit 28120b7

Please sign in to comment.