Skip to content

fix(asynciterable): use more yield #379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

jeengbe
Copy link
Contributor

@jeengbe jeengbe commented Jan 19, 2025

I went through the async-iterable functions and swapped out custom while loops with yield where I could. Also took the liberty to clean up some of the code while at it. The benefit of yield* over custom code is that it takes care of returning the wrapped iterator, etc.


As a consequence of microsoft/TypeScript#61022, the following pattern fails for operators ( concat) implemented using yield* on targets that use <= ES2017 (passes for higher targets):

test('yield*', async () => {
  let i = 0;

  async function* asyncGenerator() {
    i++;
    yield 1;
  }

  const res = concat(asyncGenerator(), asyncGenerator()).pipe(take(1));
  const items = await toArray(res);

  expect(i).toBe(1); // Actually is 2 because loop continues
});

So even though yield* should be able to be used in most places, because of this, I went with loops and yield.

@jeengbe

This comment was marked as outdated.

@jeengbe

This comment was marked as outdated.

@jeengbe jeengbe marked this pull request as ready for review January 22, 2025 19:34
@trxcllnt
Copy link
Member

Wow that TS issue is concerning. Is the addition of the returnAsyncIterators function related to that change? It looks to be materially different from the existing behavior, e.g. it doesn't await the it.return() Promises.

@jeengbe
Copy link
Contributor Author

jeengbe commented Jan 24, 2025

The bug just means that we can't use yield* and must instead manually iterate and yield individual items. For all we care here, both are functionally equivalent.

I'm not fully certain about what the best play with returning iterators is, but (thinking out loud here), if you look at for example timeout():

try {
  while (1) {
    const { type, value } = await safeRace<TimeoutOperation<TSource>>([
      it.next().then((val) => {
        return { type: VALUE_TYPE, value: val };
      }),
      sleep(this._dueTime, signal).then(() => {
        return { type: ERROR_TYPE };
      }),
    ]);

    if (type === ERROR_TYPE) {
      throw new TimeoutError();
    }

    if (!value || value.done) {
      break;
    }
    yield value.value;
  }
} finally {
  await it?.return?.();
}

What we want in finally is not necessarily to return the iterator, but to abort it instead. The following test should pass, but currently doesn't. It hangs, and Jest eventually kills the run.

test('AsyncIterable#timeout with never', async () => {
  const xs = never().pipe(timeout(1000));

  const it = xs[Symbol.asyncIterator]();
  await noNext(it);
});

The following snippet hopefully demonstrates why we can't await it.return() calls.

(async () => {
    setTimeout(() => { }, 10e3); // Keep event loop running

    async function* a() {
        await new Promise(() => { });
    }

    const it = a();
    void it.next();
    await it.return();
    console.log("Done")
})()

If you return a generator that's currently running (the unresolved it.next() is important. If you remove it, it works as expected), the it.return() promise hangs indefinitely.

To be fully correct when we race iterators, we should actually abort whichever don't finish the race (if you instead return them, you get the above scenario), and only the one that yielded first should eventually return. What such an implementation, we would not need #378 at all, since bufferCountOrTime would instead abort the interval.


The following test currently passes on master, even though it technically should not (with the current return implementation).

test('canceled', async () => {
  let canceled = false;

  async function* generate() {
    try {
      for (let i = 0; ; i++) {
        await delay(100);
        yield i;
      }
    } finally {
      canceled = true;
    }
  }

  const it = batch()(generate())[Symbol.asyncIterator]();

  await delay(150);
  expect(await it.next()).toEqual({ done: false, value: [0] });

  expect(await it.return!()).toEqual({ done: true });
  expect(canceled).toBe(true);
});

The test only succeeds because when you return, it awaits the remaining delay and eventually returns from the suspended position. If you crank up the delay and run something like the following instead, the test no longer passes.

test('canceled', async () => {
  let canceled = false;

  async function* generate() {
    try {
      for (let i = 0; ; i++) {
        await delay(10000);
        yield i;
      }
    } finally {
      canceled = true;
    }
  }

  const it = batch()(generate())[Symbol.asyncIterator]();

  expect(await it.return!()).toEqual({ done: true });
  expect(canceled).toBe(true);
});

This test would pass if and only if we instead aborted the delay when the buffered iterator is returned.

For that reason, I have removed tests like these that rely on await it.return() to continue execution until the next yield is reached.


In conclusion, I think, there is no scenario in which we can blindly await it!.return?.(); and expect it to work. We would always need to keep track of whether any generators are currently suspended or executing, and either return or abort accordingly. For now, however, void it!.return?.(); has to do, I would say.

@trxcllnt
Copy link
Member

trxcllnt commented May 5, 2025

From the linked TS issue, it seems the fix was targeted for the TS v5.8.0 milestone. Maybe we should try updating the TS version to see if this works now?

@trxcllnt
Copy link
Member

trxcllnt commented May 7, 2025

I am not happy with the closure compiler bug causing the UMD builds to fail... I'm inclined to migrate away from closure compiler for optimization/minification.

@trxcllnt
Copy link
Member

trxcllnt commented May 7, 2025

What we want in finally is not necessarily to return the iterator, but to abort it instead.
...
If you return a generator that's currently running (the unresolved it.next() is important. If you remove it, it works as expected), the it.return() promise hangs indefinitely.
...
To be fully correct when we race iterators, we should actually abort whichever don't finish the race (if you instead return them, you get the above scenario), and only the one that yielded first should eventually return.

If I understand correctly, you want an AsyncIterator that tracks each Promise returned by next(), and aborts them prematurely if the user calls throw() or return()? That sounds like a fundamental change to the AsyncIterator protocol in JS (which to be clear -- I would support), but that ship has sailed.

We could use an existing (or implement our own) cancelable Promise type. That would essentially bring it in line with Task, which would more closely align with the original .NET implementation.

Unfortunately this would not compose, as there's many occasions for users to implicitly drop out of the Ix ecosystem and revert to using native Promises instead. Even a simple Ix.from(async function*() {}) would give someone the opportunity to do what you've done here. I suppose we could still wrap their Promises in something that still respects the AbortSignal (and turns a blind eye to whether the underlying Promise ever returns or rejects), but that sounds inefficient and gross.

// Calling return on a generator that is currently executing should throw a TypeError, so we can't
// just await the return call of any iterator. To be fully correct, we should instead abort instead
// of returning in most situations, but for now, this will do.
// TODO: Send a signal to the other iterators to stop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: Send a signal to the other iterators to stop
// TODO: Send a signal to the pending `next()` Promises to stop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems useful, but I'm not sure it does what you want, or if it's even possible to do what you want. From your example above:

(async () => {
  setTimeout(() => { }, 10e3); // Keep event loop running

  async function* a() {
    try {
      console.log("awaiting never promise");
      await new Promise(() => { });
    } finally {
      console.log("exiting a()");
    }
  }

  console.log("calling a()");
  const it = a();
  console.log("calling it.next()");
  void it.next();
  console.log("calling it.return()");
  void it.return();
  console.log("Done");
})()

If I run this, I never see the "exiting a()" log in the finally block executed:

$ node test.js 
calling a()
calling it.next()
awaiting never promise
calling it.return()
Done

Copy link
Contributor Author

@jeengbe jeengbe May 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to leverage the extra AbortSignal in the AsyncIterableX protocol (abstract [Symbol.asyncIterator](signal?: AbortSignal): AsyncIterator<T>) to signal cancellation instead of returning an iterator that is potentially not in the correct state to call return on. Something like the following:

export class ZipAsyncIterable<TSource> extends AsyncIterableX<TSource[]> {
  private _sources: AsyncIterable<TSource>[];

  constructor(sources: AsyncIterable<TSource>[]) {
    super();
    this._sources = sources;
  }

  // eslint-disable-next-line consistent-return
  async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterableIterator<TSource[]> {
    throwIfAborted(signal);

    if (this._sources.length === 0) {
      return;
    }

    const innerController = new AbortController();
    signal?.addEventListener('abort', () => {
      innerController.abort();
    });

    const iterators = this._sources.map((x) => wrapWithAbort(x, innerController.signal)[Symbol.asyncIterator]());

    try {
      while (1) {
        const results = await Promise.all(iterators.map((x) => x.next()));

        if (results.some(({ done }) => done)) {
          return;
        }

        yield results.map(({ value }) => value);
      }
    } finally {
      innerController.abort();
    }
  }
}

which is a bigger change that extracting returnAsyncIterators. Its purpose is cleaning up values/triggering finally blocks, which it does mostly good enough still.

@jeengbe
Copy link
Contributor Author

jeengbe commented May 11, 2025

From the linked TS issue, it seems the fix was targeted for the TS v5.8.0 milestone. Maybe we should try updating the TS version to see if this works now?

Doesn't seem to be fixed yet, unfortunately: playground

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants