Skip to content

Commit

Permalink
fix: use Deno.Command internally instead of Deno.run (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsherret authored Apr 29, 2023
1 parent f0deaab commit 06c07dd
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 55 deletions.
6 changes: 3 additions & 3 deletions dprint.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"**/*.generated.js"
],
"plugins": [
"https://plugins.dprint.dev/typescript-0.79.0.wasm",
"https://plugins.dprint.dev/json-0.17.0.wasm",
"https://plugins.dprint.dev/markdown-0.15.1.wasm",
"https://plugins.dprint.dev/typescript-0.84.2.wasm",
"https://plugins.dprint.dev/json-0.17.2.wasm",
"https://plugins.dprint.dev/markdown-0.15.2.wasm",
"https://plugins.dprint.dev/toml-0.5.4.wasm",
"https://plugins.dprint.dev/exec-0.3.5.json@d687dda57be0fe9a0088ccdaefa5147649ff24127d8b3ea227536c68ee7abeab"
]
Expand Down
3 changes: 2 additions & 1 deletion mod.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import {
assertRejects,
assertStringIncludes,
assertThrows,
readerFromStreamReader,
withTempDir,
} from "./src/deps.test.ts";
import { Buffer, colors, path, readerFromStreamReader } from "./src/deps.ts";
import { Buffer, colors, path } from "./src/deps.ts";

Deno.test("should get stdout when piped", async () => {
const output = await $`echo 5`.stdout("piped");
Expand Down
1 change: 1 addition & 0 deletions src/deps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export {
assertStringIncludes,
assertThrows,
} from "https://deno.land/[email protected]/testing/asserts.ts";
export { readerFromStreamReader } from "https://deno.land/[email protected]/streams/reader_from_stream_reader.ts";
export { writableStreamFromWriter } from "https://deno.land/[email protected]/streams/writable_stream_from_writer.ts";
export { serve } from "https://deno.land/[email protected]/http/server.ts";

Expand Down
95 changes: 44 additions & 51 deletions src/shell.ts
Original file line number Diff line number Diff line change
Expand Up @@ -570,14 +570,17 @@ async function executeCommandArgs(commandArgs: string[], context: Context): Prom
return executeCommandArgs([...resolvedCommand.args, resolvedCommand.path, ...commandArgs.slice(1)], context);
}
const _assertIsPath: "path" = resolvedCommand.kind;
const p = Deno.run({
cmd: [resolvedCommand.path, ...commandArgs.slice(1)],
cwd: context.getCwd(),
env: context.getEnvVars(),
const pipeStringVals = {
stdin: getStdioStringValue(context.stdin),
stdout: getStdioStringValue(context.stdout.kind),
stderr: getStdioStringValue(context.stderr.kind),
});
};
const p = new Deno.Command(resolvedCommand.path, {
args: commandArgs.slice(1),
cwd: context.getCwd(),
env: context.getEnvVars(),
...pipeStringVals,
}).spawn();
const abortListener = () => p.kill("SIGKILL");
context.signal.addEventListener("abort", abortListener);
const completeController = new AbortController();
Expand All @@ -602,10 +605,14 @@ async function executeCommandArgs(commandArgs: string[], context: Context): Prom
}
});
try {
const readStdoutTask = readStdOutOrErr(p.stdout, context.stdout);
const readStderrTask = readStdOutOrErr(p.stderr, context.stderr);
const readStdoutTask = pipeStringVals.stdout === "piped"
? readStdOutOrErr(p.stdout, context.stdout)
: Promise.resolve();
const readStderrTask = pipeStringVals.stderr === "piped"
? readStdOutOrErr(p.stderr, context.stderr)
: Promise.resolve();
const [status] = await Promise.all([
p.status(),
p.status,
readStdoutTask,
readStderrTask,
]);
Expand All @@ -622,84 +629,70 @@ async function executeCommandArgs(commandArgs: string[], context: Context): Prom
} finally {
completeController.abort();
context.signal.removeEventListener("abort", abortListener);
p.close();
try {
p.stdin?.close();
} catch {
// ignore
}
try {
p.stdout?.close();
} catch {
// ignore
}
try {
p.stderr?.close();
} catch {
// ignore
}

// ensure this is done before exiting... it should never throw and it should exit quickly
// ensure this is done before exiting... it will never throw
await stdinPromise;
}

async function writeStdin(stdin: ShellPipeReader, p: Deno.Process, signal: AbortSignal) {
async function writeStdin(stdin: ShellPipeReader, p: Deno.ChildProcess, signal: AbortSignal) {
if (typeof stdin === "string") {
return;
}
await pipeReaderToWriter(stdin, p.stdin!, signal);
await pipeReaderToWriter(stdin, p.stdin, signal);
try {
p.stdin!.close();
await p.stdin.close();
} catch {
// ignore
}
}

async function readStdOutOrErr(reader: Deno.Reader | null, writer: ShellPipeWriter) {
if (typeof writer === "string" || reader == null) {
async function readStdOutOrErr(readable: ReadableStream<Uint8Array>, writer: ShellPipeWriter) {
if (typeof writer === "string") {
return;
}
// don't abort... ensure all of stdout/stderr is read in case the process
// exits before this finishes
await pipeReaderToWriterSync(reader, writer, new AbortController().signal);
await pipeReaderToWriterSync(readable, writer, new AbortController().signal);
}

async function pipeReaderToWriter(reader: Deno.Reader, writer: Deno.Writer, signal: AbortSignal) {
async function pipeReaderToWriter(reader: Deno.Reader, writable: WritableStream<Uint8Array>, signal: AbortSignal) {
const abortedPromise = new Promise<void>((resolve) => {
signal.addEventListener("abort", listener);
function listener() {
signal.removeEventListener("abort", listener);
resolve();
}
});
while (!signal.aborted) {
const buffer = new Uint8Array(1024);
const length = await Promise.race([abortedPromise, reader.read(buffer)]);
if (length === 0 || length == null) {
break;
}
await writeAll(buffer.subarray(0, length));
}

async function writeAll(arr: Uint8Array) {
let nwritten = 0;
while (nwritten < arr.length && !signal.aborted) {
nwritten += await writer.write(arr.subarray(nwritten));
const writer = writable.getWriter();
try {
while (!signal.aborted) {
const buffer = new Uint8Array(1024);
const length = await Promise.race([abortedPromise, reader.read(buffer)]);
if (length === 0 || length == null) {
break;
}
await writer.write(buffer.subarray(0, length));
}
} finally {
await writer.close();
}
}

async function pipeReaderToWriterSync(reader: Deno.Reader, writer: Deno.WriterSync, signal: AbortSignal) {
async function pipeReaderToWriterSync(
readable: ReadableStream<Uint8Array>,
writer: Deno.WriterSync,
signal: AbortSignal,
) {
const reader = readable.getReader();
while (!signal.aborted) {
const buffer = new Uint8Array(1024);
const length = await reader.read(buffer);
if (length === 0 || length == null) {
const result = await reader.read();
if (result.done) {
break;
}
writeAll(buffer.subarray(0, length));
writeAllSync(result.value);
}

function writeAll(arr: Uint8Array) {
function writeAllSync(arr: Uint8Array) {
let nwritten = 0;
while (nwritten < arr.length && !signal.aborted) {
nwritten += writer.writeSync(arr.subarray(nwritten));
Expand Down

0 comments on commit 06c07dd

Please sign in to comment.