Skip to content

Latest commit

 

History

History
291 lines (218 loc) · 7.19 KB

README.md

File metadata and controls

291 lines (218 loc) · 7.19 KB

asyncutil

JSR Test Codecov

Asynchronous primitive utility pack.

Usage

AsyncValue

AsyncValue is a class that wraps a value and allows it to be set asynchronously.

import { assertEquals } from "@std/assert";
import { AsyncValue } from "@core/asyncutil/async-value";

const v = new AsyncValue(0);
assertEquals(await v.get(), 0);
await v.set(1);
assertEquals(await v.get(), 1);

Barrier

Barrier is a synchronization primitive that allows multiple tasks to wait until all of them have reached a certain point of execution before continuing.

import { Barrier } from "@core/asyncutil/barrier";

const barrier = new Barrier(3);

async function worker(id: number) {
  console.log(`worker ${id} is waiting`);
  await barrier.wait();
  console.log(`worker ${id} is done`);
}

worker(1);
worker(2);
worker(3);

ensurePromise

ensurePromise is a utility function that ensures a value is a promise.

import { ensurePromise } from "@core/asyncutil/ensure-promise";

const p1 = ensurePromise(Promise.resolve("Resolved promise"));
console.log(await p1); // Resolved promise

const p2 = ensurePromise("Not a promise");
console.log(await p2); // Not a promise

flushPromises

flushPromises flushes all pending promises in the microtask queue.

import { flushPromises } from "@core/asyncutil/flush-promises";

let count = 0;
Array.from({ length: 5 }).forEach(() => {
  Promise.resolve()
    .then(() => count++)
    .then(() => count++);
});

console.log(count); // 0
await flushPromises();
console.log(count); // 10

Lock/RwLock

Lock is a mutual exclusion lock that provides safe concurrent access to a shared value.

import { AsyncValue } from "@core/asyncutil/async-value";
import { Lock } from "@core/asyncutil/lock";

// Critical section
const count = new Lock(new AsyncValue(0));
await count.lock(async (count) => {
  const v = await count.get();
  count.set(v + 1);
});

RwLock is a reader-writer lock implementation that allows multiple concurrent reads but only one write at a time. Readers can acquire the lock simultaneously as long as there are no writers holding the lock. Writers block all other readers and writers until the write operation completes.

import { AsyncValue } from "@core/asyncutil/async-value";
import { RwLock } from "@core/asyncutil/rw-lock";

const count = new RwLock(new AsyncValue(0));

// rlock should allow multiple readers at a time
await Promise.all(
  [...Array(10)].map(() => {
    return count.rlock(async (count) => {
      console.log(await count.get());
    });
  }),
);

// lock should allow only one writer at a time
await Promise.all(
  [...Array(10)].map(() => {
    return count.lock(async (count) => {
      const v = await count.get();
      console.log(v);
      count.set(v + 1);
    });
  }),
);

Mutex

Mutex is a mutex (mutual exclusion) is a synchronization primitive that grants exclusive access to a shared resource.

This is a low-level primitive. Use Lock instead of Mutex if you need to access a shared value concurrently.

import { AsyncValue } from "@core/asyncutil/async-value";
import { Mutex } from "@core/asyncutil/mutex";

const count = new AsyncValue(0);

async function doSomething() {
  const v = await count.get();
  await count.set(v + 1);
}

const mu = new Mutex();

// Critical section
{
  using _lock = await mu.acquire();
  await doSomething();
}

Notify

Notify is an async notifier that allows one or more "waiters" to wait for a notification.

import { assertEquals } from "@std/assert";
import { promiseState } from "@core/asyncutil/promise-state";
import { Notify } from "@core/asyncutil/notify";

const notify = new Notify();
const waiter1 = notify.notified();
const waiter2 = notify.notified();
notify.notify();
assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "pending");
notify.notify();
assertEquals(await promiseState(waiter1), "fulfilled");
assertEquals(await promiseState(waiter2), "fulfilled");

peekPromiseState

peekPromiseState is used to determine the state of the promise. Mainly for testing purpose.

import { peekPromiseState } from "@core/asyncutil/peek-promise-state";

const p1 = Promise.resolve("Resolved promise");
console.log(await peekPromiseState(p1)); // fulfilled

const p2 = Promise.reject("Rejected promise").catch(() => undefined);
console.log(await peekPromiseState(p2)); // rejected

const p3 = new Promise(() => undefined);
console.log(await peekPromiseState(p3)); // pending

Use flushPromises to wait all pending promises to resolve.

import { flushPromises } from "@core/asyncutil/flush-promises";
import { peekPromiseState } from "@core/asyncutil/peek-promise-state";

const p = Promise.resolve<void>(undefined)
  .then(() => {})
  .then(() => {});

console.log(await peekPromiseState(p)); // pending
await flushPromises();
console.log(await peekPromiseState(p)); // fulfilled

Queue/Stack

Queue is a queue implementation that allows for adding and removing elements, with optional waiting when popping elements from an empty queue.

import { assertEquals } from "@std/assert";
import { Queue } from "@core/asyncutil/queue";

const queue = new Queue<number>();
queue.push(1);
queue.push(2);
queue.push(3);
assertEquals(await queue.pop(), 1);
assertEquals(await queue.pop(), 2);
assertEquals(await queue.pop(), 3);

Stack is a stack implementation that allows for adding and removing elements, with optional waiting when popping elements from an empty stack.

import { assertEquals } from "@std/assert";
import { Stack } from "@core/asyncutil/stack";

const stack = new Stack<number>();
stack.push(1);
stack.push(2);
stack.push(3);
assertEquals(await stack.pop(), 3);
assertEquals(await stack.pop(), 2);
assertEquals(await stack.pop(), 1);

Semaphore

A semaphore that allows a limited number of concurrent executions of an operation.

import { Semaphore } from "@core/asyncutil/semaphore";

const sem = new Semaphore(5);
const worker = () => {
  return sem.lock(async () => {
    // do something
  });
};
await Promise.all([...Array(10)].map(() => worker()));

WaitGroup

WaitGroup is a synchronization primitive that enables promises to coordinate and synchronize their execution. It is particularly useful in scenarios where a specific number of tasks must complete before the program can proceed.

import { delay } from "@std/async/delay";
import { WaitGroup } from "@core/asyncutil/wait-group";

const wg = new WaitGroup();

async function worker(id: number) {
  wg.add(1);
  console.log(`worker ${id} is waiting`);
  await delay(100);
  console.log(`worker ${id} is done`);
  wg.done();
}

worker(1);
worker(2);
worker(3);
await wg.wait();

License

The code follows MIT license written in LICENSE. Contributors need to agree that any modifications sent in this repository follow the license.