diff --git a/packages/node/src/Stream/Experimental/index.ts b/packages/node/src/Stream/Experimental/index.ts new file mode 100644 index 0000000..6a2fae0 --- /dev/null +++ b/packages/node/src/Stream/Experimental/index.ts @@ -0,0 +1,129 @@ +/** + * ets_tracing: off + */ + +import type * as C from "@effect-ts/core/Collections/Immutable/Chunk" +import * as T from "@effect-ts/core/Effect" +import * as S from "@effect-ts/core/Effect/Experimental/Stream" +import * as Sink from "@effect-ts/core/Effect/Experimental/Stream/Sink" +import * as M from "@effect-ts/core/Effect/Managed" +import { pipe } from "@effect-ts/core/Function" +import type * as stream from "stream" + +import * as Byte from "../../Byte" + +export class ReadableError { + readonly _tag = "ReadableError" + constructor(readonly error: Error) {} +} + +/** + * Captures a Node `Readable`, converting it into a `Stream`. The size + * + * Note: your Readable should not have an encoding set in order to work with buffers, + * calling this with a Readable with an encoding set will `Die`. + */ +export function streamFromReadable( + r: () => stream.Readable, + bufferSize: number = S.DEFAULT_CHUNK_SIZE +): S.Stream { + return pipe( + T.succeedWith(r), + T.tap((sr) => + sr.readableEncoding != null + ? T.dieMessage( + `stream.Readable encoding set to ${sr.readableEncoding} cannot be used to produce Buffer` + ) + : T.unit + ), + S.acquireReleaseWith((sr) => + T.succeedWith(() => { + sr.destroy() + }) + ), + S.chain((sr) => + S.async((emit) => { + sr.on("readable", () => { + let buffer: Buffer | null = null + + while ((buffer = sr.read(bufferSize)) !== null) { + emit.chunk(Byte.chunk(buffer)) + } + }) + sr.on("end", () => { + emit.end() + }) + sr.on("error", (err) => { + emit.fail(new ReadableError(err)) + }) + }) + ) + ) +} + +export class WritableError { + readonly _tag = "WritableError" + constructor(readonly error: Error) {} +} + +/** + * Uses the provided NodeJS `Writable` stream to create a `Sink` that consumes + * byte chunks and writes them to the `Writable` stream. The sink will yield + * the count of bytes written. + * + * The `Writable` stream will be automatically closed after the stream is + * finished or an error occurred. + */ +export function sinkFromWritable(w: () => stream.Writable) { + return Sink.unwrapManaged( + pipe( + T.succeedWith(w), + M.makeExit((sw) => + T.succeedWith(() => { + sw.destroy() + }) + ), + M.map((sw) => + Sink.foldLeftChunksEffect(0, (bytesWritten, byteChunk: C.Chunk) => + T.effectAsync((resume) => { + sw.write(Byte.buffer(byteChunk), (err) => { + if (err) { + resume(T.fail(new WritableError(err))) + } else { + resume(T.succeed(bytesWritten + byteChunk.length)) + } + }) + }) + ) + ) + ) + ) +} + +export class TransformError { + readonly _tag = "TransformError" + constructor(readonly error: Error) {} +} + +/** + * A sink that collects all of its inputs into a `Buffer`. + */ +export function collectBuffer(): Sink.Sink< + unknown, + E, + Byte.Byte, + E, + unknown, + Buffer +> { + return Sink.map_(Sink.collectAll(), Byte.buffer) +} + +/** + * Runs the stream and collects all of its elements to a buffer. + */ +export function runBuffer( + self: S.Stream +): T.Effect { + return S.run_(self, collectBuffer()) +} diff --git a/packages/node/test/stream.experimental.test.ts b/packages/node/test/stream.experimental.test.ts new file mode 100644 index 0000000..f6c39f9 --- /dev/null +++ b/packages/node/test/stream.experimental.test.ts @@ -0,0 +1,54 @@ +import * as C from "@effect-ts/core/Collections/Immutable/Chunk" +import * as T from "@effect-ts/core/Effect" +import * as S from "@effect-ts/core/Effect/Experimental/Stream" +import * as Ref from "@effect-ts/core/Effect/Ref" +import { pipe } from "@effect-ts/core/Function" +import * as fs from "fs" +import * as path from "path" +import * as stream from "stream" + +import * as Byte from "../src/Byte" +import * as NS from "../src/Stream/Experimental" + +describe("Node Stream", () => { + it("should build an Effect-TS Stream from a NodeJS stream.Readable", async () => { + const res = await pipe( + NS.streamFromReadable(() => + fs.createReadStream(path.join(__dirname, "fix/data.txt")) + ), + NS.runBuffer, + T.runPromise + ) + + expect(res.toString("utf-8")).toEqual("a, b, c") + }) + + it("should build an Effect-TS Sink from a NodeJS stream.Writable", async () => { + const mockStream = new stream.PassThrough() + let output: C.Chunk = C.empty() + + mockStream.on("data", (chunk) => { + output = C.concat_(output, Byte.chunk(chunk)) + }) + + const res = await pipe( + T.do, + T.bind("bytesWritten", () => + pipe( + Ref.makeRef(0), + T.map((ref) => + S.repeatEffect(T.delay(10)(Ref.updateAndGet_(ref, (n) => n + 1))) + ), + S.unwrap, + S.take(5), + S.map(Byte.byte), + S.run(NS.sinkFromWritable(() => mockStream)) + ) + ), + T.runPromise + ) + + expect(res.bytesWritten).toEqual(5) + expect(C.toArray(output)).toEqual([1, 2, 3, 4, 5]) + }) +})