Skip to content

Commit 07c1900

Browse files
committed
fix(asynciterable): use more yield
1 parent 612fa41 commit 07c1900

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+816
-748
lines changed

spec/asynciterable-operators/batch-spec.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ test('done while waiting', async () => {
4949
expect(await it.next()).toEqual({ done: true });
5050
});
5151

52-
test('canceled', async () => {
52+
// eslint-disable-next-line jest/no-disabled-tests -- See https://github.com/ReactiveX/IxJS/pull/379#issuecomment-2611883590
53+
test.skip('canceled', async () => {
5354
let canceled = false;
5455

5556
async function* generate() {

spec/asynciterable-operators/finalize-spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { hasNext, hasErr, noNext } from '../asynciterablehelpers.js';
22
import { range, throwError } from 'ix/asynciterable/index.js';
3-
import { flatMap, finalize, tap } from 'ix/asynciterable/operators/index.js';
3+
import { finalize, tap, flatMap } from 'ix/asynciterable/operators/index.js';
44

55
test('AsyncIterable#finalize defers behavior', async () => {
66
let done = false;

spec/asynciterable-operators/mergeall-spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ import { mergeAll } from 'ix/asynciterable/operators/index.js';
44

55
test('AsyncIterable#merge mergeAll behavior', async () => {
66
const res = of(of(1, 2, 3), of(4, 5)).pipe(mergeAll());
7-
expect(await toArray(res)).toEqual([1, 2, 4, 3, 5]);
7+
expect(await toArray(res)).toEqual([1, 4, 2, 5, 3]);
88
});

spec/asynciterable-operators/timeout-spec.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ test('AsyncIterable#timeout throws when delayed', async () => {
3131
await noNext(it);
3232
});
3333

34-
test('AsyncIterable#timeout triggers finalize', async () => {
34+
// eslint-disable-next-line jest/no-disabled-tests -- See https://github.com/ReactiveX/IxJS/pull/379#issuecomment-2611883590
35+
test.skip('AsyncIterable#timeout triggers finalize', async () => {
3536
let done = false;
3637
const xs = async function* () {
3738
yield await delayValue(1, 500);
@@ -48,5 +49,6 @@ test('AsyncIterable#timeout triggers finalize', async () => {
4849
await hasNext(it, 1);
4950
await hasErr(it, TimeoutError);
5051
await noNext(it);
52+
await new Promise((res) => setTimeout(res, 10));
5153
expect(done).toBeTruthy();
5254
});

spec/asynciterable/concat-spec.ts

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
1+
import { take } from 'ix/asynciterable/operators.js';
12
import '../asynciterablehelpers.js';
2-
import { concat, of, sequenceEqual } from 'ix/asynciterable/index.js';
3+
import { concat, of, sequenceEqual, toArray } from 'ix/asynciterable/index.js';
34

45
test('AsyncIterable#concat behavior', async () => {
56
const res = concat(of(1, 2, 3), of(4, 5));
67
expect(await sequenceEqual(res, of(1, 2, 3, 4, 5))).toBeTruthy();
78
});
9+
10+
test("AsyncIterable#concat doesn't execute more than necessary", async () => {
11+
let i = 0;
12+
13+
async function* asyncGenerator() {
14+
i++;
15+
yield 1;
16+
}
17+
18+
const res = concat(asyncGenerator(), asyncGenerator()).pipe(take(1));
19+
const items = await toArray(res);
20+
21+
expect(items).toEqual([1]);
22+
// This second generator should not be started at all since the first one
23+
// provides enough values
24+
expect(i).toBe(1);
25+
});

src/add/asynciterable-operators/mergeall.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export function mergeAllProto<T>(
88
this: AsyncIterableX<AsyncIterable<T>>,
99
concurrent = Infinity
1010
): AsyncIterableX<T> {
11-
return mergeAll(concurrent)(this);
11+
return mergeAll<T>(concurrent)(this);
1212
}
1313

1414
AsyncIterableX.prototype.mergeAll = mergeAllProto;

src/asynciterable/_extremaby.ts

+21-19
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,31 @@ export async function extremaBy<TSource, TKey>(
99
): Promise<TSource[]> {
1010
throwIfAborted(signal);
1111

12-
let result = [];
13-
const it = wrapWithAbort(source, signal)[Symbol.asyncIterator]();
14-
const { value, done } = await it.next();
15-
if (done) {
16-
throw new Error('Sequence contains no elements');
17-
}
18-
19-
let resKey = await selector(value, signal);
20-
result.push(value);
12+
let hasValue = false;
13+
let key: TKey | undefined;
14+
let result: TSource[] = [];
2115

22-
let next: IteratorResult<TSource>;
23-
while (!(next = await it.next()).done) {
24-
const current = next.value;
25-
const key = await selector(current, signal);
26-
const cmp = await comparer(key, resKey, signal);
16+
for await (const item of wrapWithAbort(source, signal)) {
17+
if (!hasValue) {
18+
key = await selector(item, signal);
19+
result.push(item);
20+
hasValue = true;
21+
} else {
22+
const currentKey = await selector(item, signal);
23+
const cmp = await comparer(currentKey, key as TKey, signal);
2724

28-
if (cmp === 0) {
29-
result.push(current);
30-
} else if (cmp > 0) {
31-
result = [current];
32-
resKey = key;
25+
if (cmp === 0) {
26+
result.push(item);
27+
} else if (cmp > 0) {
28+
result = [item];
29+
key = currentKey;
30+
}
3331
}
3432
}
3533

34+
if (!hasValue) {
35+
throw new Error('Sequence contains no elements');
36+
}
37+
3638
return result;
3739
}

src/asynciterable/asynciterablex.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ export class FromPromiseIterable<TSource, TResult = TSource> extends AsyncIterab
290290
}
291291

292292
async *[Symbol.asyncIterator]() {
293-
const item = await this._source;
294-
yield await this._selector(item, 0);
293+
yield await this._selector(await this._source, 0);
295294
}
296295
}
297296

src/asynciterable/average.ts

+3
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@ export async function average(
4242
['signal']: signal,
4343
['thisArg']: thisArg,
4444
} = options || {};
45+
4546
throwIfAborted(signal);
47+
4648
let sum = 0;
4749
let count = 0;
50+
4851
for await (const item of wrapWithAbort(source, signal)) {
4952
sum += await selector.call(thisArg, item, signal);
5053
count++;

src/asynciterable/catcherror.ts

+8-22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { AsyncIterableX } from './asynciterablex.js';
2-
import { returnAsyncIterator } from '../util/returniterator.js';
32
import { wrapWithAbort } from './operators/withabort.js';
43
import { throwIfAborted } from '../aborterror.js';
54

@@ -19,29 +18,16 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
1918
let hasError = false;
2019

2120
for (const source of this._source) {
22-
const it = wrapWithAbort(source, signal)[Symbol.asyncIterator]();
23-
2421
error = null;
2522
hasError = false;
2623

27-
while (1) {
28-
let c = <TSource>{};
29-
30-
try {
31-
const { done, value } = await it.next();
32-
if (done) {
33-
await returnAsyncIterator(it);
34-
break;
35-
}
36-
c = value;
37-
} catch (e) {
38-
error = e;
39-
hasError = true;
40-
await returnAsyncIterator(it);
41-
break;
24+
try {
25+
for await (const item of wrapWithAbort(source, signal)) {
26+
yield item;
4227
}
43-
44-
yield c;
28+
} catch (e) {
29+
error = e;
30+
hasError = true;
4531
}
4632

4733
if (!hasError) {
@@ -64,7 +50,7 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
6450
* sequences until a source sequence terminates successfully.
6551
*/
6652
export function catchAll<T>(source: Iterable<AsyncIterable<T>>): AsyncIterableX<T> {
67-
return new CatchAllAsyncIterable<T>(source);
53+
return new CatchAllAsyncIterable(source);
6854
}
6955

7056
/**
@@ -76,5 +62,5 @@ export function catchAll<T>(source: Iterable<AsyncIterable<T>>): AsyncIterableX<
7662
* sequences until a source sequence terminates successfully.
7763
*/
7864
export function catchError<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T> {
79-
return new CatchAllAsyncIterable<T>(args);
65+
return new CatchAllAsyncIterable(args);
8066
}

src/asynciterable/combinelatest.ts

+27-23
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ import { identity } from '../util/identity.js';
33
import { wrapWithAbort } from './operators/withabort.js';
44
import { throwIfAborted } from '../aborterror.js';
55
import { safeRace } from '../util/safeRace.js';
6+
import { returnAsyncIterators } from '../util/returniterator.js';
67

78
// eslint-disable-next-line @typescript-eslint/no-empty-function
8-
const NEVER_PROMISE = new Promise(() => {});
9+
const NEVER_PROMISE = new Promise<never>(() => {});
910

1011
type MergeResult<T> = { value: T; index: number };
1112

@@ -28,39 +29,42 @@ export class CombineLatestAsyncIterable<TSource> extends AsyncIterableX<TSource[
2829
const length = this._sources.length;
2930
const iterators = new Array<AsyncIterator<TSource>>(length);
3031
const nexts = new Array<Promise<MergeResult<IteratorResult<TSource>>>>(length);
31-
let hasValueAll = false;
32-
const values = new Array<TSource>(length);
33-
const hasValues = new Array<boolean>(length);
34-
let active = length;
3532

36-
hasValues.fill(false);
33+
let active = length;
34+
let allValuesAvailable = false;
35+
const values = new Array<TSource>(length);
36+
const hasValues = new Array<boolean>(length).fill(false);
3737

3838
for (let i = 0; i < length; i++) {
3939
const iterator = wrapWithAbort(this._sources[i], signal)[Symbol.asyncIterator]();
4040
iterators[i] = iterator;
4141
nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
4242
}
4343

44-
while (active > 0) {
45-
const next = safeRace(nexts);
46-
const {
47-
value: { value: value$, done: done$ },
48-
index,
49-
} = await next;
50-
if (done$) {
51-
nexts[index] = <Promise<MergeResult<IteratorResult<TSource>>>>NEVER_PROMISE;
52-
active--;
53-
} else {
54-
values[index] = value$;
55-
hasValues[index] = true;
44+
try {
45+
while (active > 0) {
46+
const {
47+
value: { value, done },
48+
index,
49+
} = await safeRace(nexts);
50+
51+
if (done) {
52+
nexts[index] = NEVER_PROMISE;
53+
active--;
54+
} else {
55+
values[index] = value;
56+
hasValues[index] = true;
57+
allValuesAvailable = allValuesAvailable || hasValues.every(identity);
5658

57-
const iterator$ = iterators[index];
58-
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
59+
nexts[index] = wrapPromiseWithIndex(iterators[index].next(), index);
5960

60-
if (hasValueAll || (hasValueAll = hasValues.every(identity))) {
61-
yield values;
61+
if (allValuesAvailable) {
62+
yield values;
63+
}
6264
}
6365
}
66+
} finally {
67+
await returnAsyncIterators(iterators);
6468
}
6569
}
6670
}
@@ -176,5 +180,5 @@ export function combineLatest<T, T2, T3, T4, T5, T6>(
176180
*/
177181
export function combineLatest<T>(...sources: AsyncIterable<T>[]): AsyncIterableX<T[]>;
178182
export function combineLatest<T>(...sources: any[]): AsyncIterableX<T[]> {
179-
return new CombineLatestAsyncIterable<T>(sources);
183+
return new CombineLatestAsyncIterable(sources);
180184
}

src/asynciterable/concat.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export class ConcatAsyncIterable<TSource> extends AsyncIterableX<TSource> {
1313

1414
async *[Symbol.asyncIterator](signal?: AbortSignal) {
1515
throwIfAborted(signal);
16+
1617
for (const outer of this._source) {
1718
for await (const item of wrapWithAbort(outer, signal)) {
1819
yield item;
@@ -24,7 +25,7 @@ export class ConcatAsyncIterable<TSource> extends AsyncIterableX<TSource> {
2425
export function _concatAll<TSource>(
2526
source: Iterable<AsyncIterable<TSource>>
2627
): AsyncIterableX<TSource> {
27-
return new ConcatAsyncIterable<TSource>(source);
28+
return new ConcatAsyncIterable(source);
2829
}
2930

3031
/**
@@ -136,5 +137,5 @@ export function concat<T, T2, T3, T4, T5, T6>(
136137
* @returns {AsyncIterableX<T>} An async-iterable sequence that contains the elements of each given sequence, in sequential order.
137138
*/
138139
export function concat<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T> {
139-
return new ConcatAsyncIterable<T>(args);
140+
return new ConcatAsyncIterable(args);
140141
}

src/asynciterable/count.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ export async function count<T>(
1818
): Promise<number> {
1919
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate = async () => true } =
2020
options || {};
21+
2122
throwIfAborted(signal);
22-
let i = 0;
2323

24+
let i = 0;
2425
for await (const item of wrapWithAbort(source, signal)) {
2526
if (await predicate.call(thisArg, item, i, signal)) {
2627
i++;

src/asynciterable/create.ts

+6-3
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ class AnonymousAsyncIterable<T> extends AsyncIterableX<T> {
1111

1212
async *[Symbol.asyncIterator](signal?: AbortSignal) {
1313
throwIfAborted(signal);
14+
1415
const it = await this._fn(signal);
15-
let next: IteratorResult<T> | undefined;
16-
while (!(next = await it.next()).done) {
17-
yield next.value;
16+
17+
for await (const item of {
18+
[Symbol.asyncIterator]: () => it,
19+
}) {
20+
yield item;
1821
}
1922
}
2023
}

src/asynciterable/defer.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ class DeferAsyncIterable<TSource> extends AsyncIterableX<TSource> {
1414

1515
async *[Symbol.asyncIterator](signal?: AbortSignal) {
1616
throwIfAborted(signal);
17-
const items = await this._fn(signal);
18-
for await (const item of wrapWithAbort(items, signal)) {
17+
18+
for await (const item of wrapWithAbort(await this._fn(signal), signal)) {
1919
yield item;
2020
}
2121
}
@@ -32,5 +32,5 @@ class DeferAsyncIterable<TSource> extends AsyncIterableX<TSource> {
3232
export function defer<TSource>(
3333
factory: (signal?: AbortSignal) => AsyncIterable<TSource> | Promise<AsyncIterable<TSource>>
3434
): AsyncIterableX<TSource> {
35-
return new DeferAsyncIterable<TSource>(factory);
35+
return new DeferAsyncIterable(factory);
3636
}

src/asynciterable/elementat.ts

+2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ export async function elementAt<T>(
1717
signal?: AbortSignal
1818
): Promise<T | undefined> {
1919
throwIfAborted(signal);
20+
2021
let i = index;
2122
for await (const item of wrapWithAbort(source, signal)) {
2223
if (i === 0) {
2324
return item;
2425
}
2526
i--;
2627
}
28+
2729
return undefined;
2830
}

src/asynciterable/every.ts

+3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ export async function every<T>(
1616
options: FindOptions<T>
1717
): Promise<boolean> {
1818
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options;
19+
1920
throwIfAborted(signal);
21+
2022
let i = 0;
2123
for await (const item of wrapWithAbort(source, signal)) {
2224
if (!(await predicate.call(thisArg, item, i++, signal))) {
2325
return false;
2426
}
2527
}
28+
2629
return true;
2730
}

0 commit comments

Comments
 (0)