Skip to content

Commit 589a473

Browse files
tim-smarteffect-bot
authored andcommitted
add Stream.toAsyncIterableRuntime api (#4680)
1 parent e118bde commit 589a473

File tree

4 files changed

+123
-0
lines changed

4 files changed

+123
-0
lines changed

.changeset/funny-islands-relate.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
add Stream.toAsyncIterable\* apis
6+
7+
```ts
8+
import { Stream } from "effect"
9+
10+
// Will print:
11+
// 1
12+
// 2
13+
// 3
14+
const stream = Stream.make(1, 2, 3)
15+
for await (const result of Stream.toAsyncIterable(stream)) {
16+
console.log(result)
17+
}
18+
```

packages/effect/src/Stream.ts

+28
Original file line numberDiff line numberDiff line change
@@ -5332,6 +5332,34 @@ export const toReadableStreamRuntime: {
53325332
): ReadableStream<A>
53335333
} = internal.toReadableStreamRuntime
53345334

5335+
/**
5336+
* Converts the stream to a `AsyncIterable` using the provided runtime.
5337+
*
5338+
* @since 3.15.0
5339+
* @category destructors
5340+
*/
5341+
export const toAsyncIterableRuntime: {
5342+
<A, XR>(runtime: Runtime<XR>): <E, R extends XR>(self: Stream<A, E, R>) => AsyncIterable<A>
5343+
<A, E, XR, R extends XR>(self: Stream<A, E, R>, runtime: Runtime<XR>): AsyncIterable<A>
5344+
} = internal.toAsyncIterableRuntime
5345+
5346+
/**
5347+
* Converts the stream to a `AsyncIterable` capturing the required dependencies.
5348+
*
5349+
* @since 3.15.0
5350+
* @category destructors
5351+
*/
5352+
export const toAsyncIterableEffect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<AsyncIterable<A>, never, R> =
5353+
internal.toAsyncIterableEffect
5354+
5355+
/**
5356+
* Converts the stream to a `AsyncIterable`.
5357+
*
5358+
* @since 3.15.0
5359+
* @category destructors
5360+
*/
5361+
export const toAsyncIterable: <A, E>(self: Stream<A, E>) => AsyncIterable<A> = internal.toAsyncIterable
5362+
53355363
/**
53365364
* Applies the transducer to the stream and emits its outputs.
53375365
*

packages/effect/src/internal/stream.ts

+68
Original file line numberDiff line numberDiff line change
@@ -7200,6 +7200,74 @@ export const transduce = dual<
72007200
}
72017201
)
72027202

7203+
/** @internal */
7204+
export const toAsyncIterableRuntime = dual<
7205+
<A, XR>(
7206+
runtime: Runtime.Runtime<XR>
7207+
) => <E, R extends XR>(self: Stream.Stream<A, E, R>) => AsyncIterable<A>,
7208+
<A, E, XR, R extends XR>(
7209+
self: Stream.Stream<A, E, R>,
7210+
runtime: Runtime.Runtime<XR>
7211+
) => AsyncIterable<A>
7212+
>(
7213+
(args) => isStream(args[0]),
7214+
<A, E, XR, R extends XR>(
7215+
self: Stream.Stream<A, E, R>,
7216+
runtime: Runtime.Runtime<XR>
7217+
): AsyncIterable<A> => {
7218+
const runFork = Runtime.runFork(runtime)
7219+
return {
7220+
[Symbol.asyncIterator]() {
7221+
let currentResolve: ((value: IteratorResult<A>) => void) | undefined = undefined
7222+
let currentReject: ((reason: any) => void) | undefined = undefined
7223+
let fiber: Fiber.RuntimeFiber<void, E> | undefined = undefined
7224+
const latch = Effect.unsafeMakeLatch(false)
7225+
return {
7226+
next() {
7227+
if (!fiber) {
7228+
fiber = runFork(runForEach(self, (value) =>
7229+
latch.whenOpen(Effect.sync(() => {
7230+
latch.unsafeClose()
7231+
currentResolve!({ done: false, value })
7232+
currentResolve = currentReject = undefined
7233+
}))))
7234+
fiber.addObserver((exit) => {
7235+
fiber = Effect.runFork(latch.whenOpen(Effect.sync(() => {
7236+
if (exit._tag === "Failure") {
7237+
currentReject!(Cause.squash(exit.cause))
7238+
} else {
7239+
currentResolve!({ done: true, value: void 0 })
7240+
}
7241+
currentResolve = currentReject = undefined
7242+
})))
7243+
})
7244+
}
7245+
return new Promise<IteratorResult<A>>((resolve, reject) => {
7246+
currentResolve = resolve
7247+
currentReject = reject
7248+
latch.unsafeOpen()
7249+
})
7250+
},
7251+
return() {
7252+
if (!fiber) return Promise.resolve({ done: true, value: void 0 })
7253+
return Effect.runPromise(Effect.as(Fiber.interrupt(fiber), { done: true, value: void 0 }))
7254+
}
7255+
}
7256+
}
7257+
}
7258+
}
7259+
)
7260+
7261+
/** @internal */
7262+
export const toAsyncIterable = <A, E>(self: Stream.Stream<A, E>): AsyncIterable<A> =>
7263+
toAsyncIterableRuntime(self, Runtime.defaultRuntime)
7264+
7265+
/** @internal */
7266+
export const toAsyncIterableEffect = <A, E, R>(
7267+
self: Stream.Stream<A, E, R>
7268+
): Effect.Effect<AsyncIterable<A>, never, R> =>
7269+
Effect.map(Effect.runtime<R>(), (runtime) => toAsyncIterableRuntime(self, runtime))
7270+
72037271
/** @internal */
72047272
export const unfold = <S, A>(s: S, f: (s: S) => Option.Option<readonly [A, S]>): Stream.Stream<A> =>
72057273
unfoldChunk(s, (s) => pipe(f(s), Option.map(([a, s]) => [Chunk.of(a), s])))

packages/effect/test/Stream/conversions.test.ts

+9
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,13 @@ describe("Stream", () => {
7373
)
7474
deepStrictEqual(queue, Exit.die(new Cause.RuntimeException("die")))
7575
}))
76+
77+
it("toAsyncIterable", async () => {
78+
const stream = Stream.make(1, 2, 3)
79+
const results: Array<number> = []
80+
for await (const result of Stream.toAsyncIterable(stream)) {
81+
results.push(result)
82+
}
83+
deepStrictEqual(results, [1, 2, 3])
84+
})
7685
})

0 commit comments

Comments
 (0)