Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace runWithTransactions to flushCallbacks finalizer for Improved Flow Control and Compatibility #2946

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 65 additions & 48 deletions src/vanilla/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
const unmountCallbacks = new Set<() => void>()
const mountCallbacks = new Set<() => void>()

let inTransaction = 0
const runWithTransaction = <T>(fn: () => T): T => {
const flushCallbacks = () => {
const errors: unknown[] = []
const call = (fn: () => void) => {
try {
Expand All @@ -258,36 +257,22 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
errors.push(e)
}
}
let result: T
++inTransaction
try {
result = fn()
} finally {
if (inTransaction === 1) {
while (
changedAtoms.size ||
unmountCallbacks.size ||
mountCallbacks.size
) {
recomputeInvalidatedAtoms()
;(store as any)[INTERNAL_flushStoreHook]?.()
const callbacks = new Set<() => void>()
const add = callbacks.add.bind(callbacks)
changedAtoms.forEach((atomState) => atomState.m?.l.forEach(add))
changedAtoms.clear()
unmountCallbacks.forEach(add)
unmountCallbacks.clear()
mountCallbacks.forEach(add)
mountCallbacks.clear()
callbacks.forEach(call)
}
}
--inTransaction
while (changedAtoms.size || unmountCallbacks.size || mountCallbacks.size) {
recomputeInvalidatedAtoms()
;(store as any)[INTERNAL_flushStoreHook]?.()
const callbacks = new Set<() => void>()
const add = callbacks.add.bind(callbacks)
changedAtoms.forEach((atomState) => atomState.m?.l.forEach(add))
changedAtoms.clear()
unmountCallbacks.forEach(add)
unmountCallbacks.clear()
mountCallbacks.forEach(add)
mountCallbacks.clear()
callbacks.forEach(call)
}
if (errors.length) {
throw errors[0]
}
return result
}

const setAtomStateValueOrPromise = (
Expand Down Expand Up @@ -344,7 +329,8 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
let isSync = true
const mountDependenciesIfAsync = () => {
if (atomState.m) {
runWithTransaction(() => mountDependencies(atom, atomState))
mountDependencies(atom, atomState)
flushCallbacks()
}
}
const getter: Getter = <V>(a: Atom<V>) => {
Expand Down Expand Up @@ -529,13 +515,14 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
atom: WritableAtom<Value, Args, Result>,
...args: Args
): Result => {
let isSync = true
const getter: Getter = <V>(a: Atom<V>) => returnAtomValue(readAtomState(a))
const setter: Setter = <V, As extends unknown[], R>(
a: WritableAtom<V, As, R>,
...args: As
) => {
const aState = ensureAtomState(a)
return runWithTransaction(() => {
try {
const aState = ensureAtomState(a)
if (isSelfAtom(atom, a)) {
if (!hasInitialValue(a)) {
// NOTE technically possible but restricted as it may cause bugs
Expand All @@ -554,15 +541,29 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
} else {
return writeAtomState(a, ...args)
}
})
} finally {
if (!isSync) {
flushCallbacks()
}
}
}
try {
return atomWrite(atom, getter, setter, ...args)
} finally {
isSync = false
}
return atomWrite(atom, getter, setter, ...args)
}

const writeAtom = <Value, Args extends unknown[], Result>(
atom: WritableAtom<Value, Args, Result>,
...args: Args
): Result => runWithTransaction(() => writeAtomState(atom, ...args))
): Result => {
try {
return writeAtomState(atom, ...args)
} finally {
flushCallbacks()
}
}

const mountDependencies = (atom: AnyAtom, atomState: AtomState) => {
if (atomState.m && !isPendingPromise(atomState.v)) {
Expand Down Expand Up @@ -604,12 +605,30 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
atomState.h?.()
if (isActuallyWritableAtom(atom)) {
const mounted = atomState.m
let setAtom: (...args: unknown[]) => unknown
const createInvocationContext = <T>(fn: () => T) => {
let isSync = true
setAtom = (...args: unknown[]) => {
try {
return writeAtomState(atom, ...args)
} finally {
if (!isSync) {
flushCallbacks()
}
}
}
try {
return fn()
} finally {
isSync = false
}
}
const processOnMount = () => {
const onUnmount = atomOnMount(atom, (...args) =>
runWithTransaction(() => writeAtomState(atom, ...args)),
const onUnmount = createInvocationContext(() =>
atomOnMount(atom, (...args) => setAtom(...args)),
)
if (onUnmount) {
mounted.u = onUnmount
mounted.u = () => createInvocationContext(onUnmount)
}
}
mountCallbacks.add(processOnMount)
Expand Down Expand Up @@ -646,17 +665,15 @@ const buildStore = (...storeArgs: StoreArgs): Store => {

const subscribeAtom = (atom: AnyAtom, listener: () => void) => {
const atomState = ensureAtomState(atom)
return runWithTransaction(() => {
const mounted = mountAtom(atom, atomState)
const listeners = mounted.l
listeners.add(listener)
return () => {
runWithTransaction(() => {
listeners.delete(listener)
unmountAtom(atom, atomState)
})
}
})
const mounted = mountAtom(atom, atomState)
const listeners = mounted.l
listeners.add(listener)
flushCallbacks()
return () => {
listeners.delete(listener)
unmountAtom(atom, atomState)
flushCallbacks()
}
}

const unstable_derive: Store['unstable_derive'] = (fn) =>
Expand Down
Loading