Skip to content

Commit

Permalink
fix(store): prevent writing to state once action handler is unsubscribed
Browse files Browse the repository at this point in the history
  • Loading branch information
arturovt committed Oct 19, 2024
1 parent b178ff0 commit 9cf8695
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .bundlemonrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
},
{
"path": "./fesm2022/ngxs-store.mjs",
"maxSize": "101kB",
"maxSize": "103kB",
"maxPercentIncrease": 0.5
}
],
Expand Down
3 changes: 1 addition & 2 deletions packages/store/src/actions/action-registry.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Injectable, type OnDestroy } from '@angular/core';
import type { Observable } from 'rxjs';

// action: Instance<ActionType>.
export type ActionHandlerFn = (action: any) => void | Promise<void> | Observable<unknown>;
export type ActionHandlerFn = (action: any) => Observable<unknown>;

@Injectable({ providedIn: 'root' })
export class NgxsActionRegistry implements OnDestroy {
Expand Down
60 changes: 45 additions & 15 deletions packages/store/src/internal/state-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import {
filter,
map,
mergeMap,
shareReplay,
takeUntil
takeUntil,
finalize,
Observable
} from 'rxjs';

import { NgxsConfig } from '../symbols';
Expand Down Expand Up @@ -260,9 +261,9 @@ export class StateFactory implements OnDestroy {
/**
* Invoke actions on the states.
*/
invokeActions(action: any) {
private invokeActions(action: any): Observable<unknown[]> {
const type = getActionTypeFromInstance(action)!;
const results = [];
const results: Observable<unknown>[] = [];

// Determines whether the dispatched action has been handled, this is assigned
// to `true` within the below `for` loop if any `actionMetas` has been found.
Expand All @@ -277,7 +278,7 @@ export class StateFactory implements OnDestroy {
try {
result = actionHandler(action);
} catch (e) {
result = throwError(e);
result = throwError(() => e);
}

results.push(result);
Expand All @@ -297,7 +298,7 @@ export class StateFactory implements OnDestroy {
}

if (!results.length) {
results.push(of({}));
results.push(of(undefined));
}

return forkJoin(results);
Expand Down Expand Up @@ -344,11 +345,15 @@ export class StateFactory implements OnDestroy {
const { dispatched$ } = this._actions;
for (const actionType of Object.keys(actions)) {
const actionHandlers = actions[actionType].map(actionMeta => {
// action: Instance<ActionType>
const cancelable = !!actionMeta.options.cancelUncompleted;

return (action: any) => {
const stateContext = this._stateContextFactory.createStateContext(path);

let result = instance[actionMeta.fn](stateContext, action);
// We explicitly specify `Observable<void>`, which is always the final
// result for the action handler, as the framework does not care about
// the values returned by action handlers.
let result: Observable<void> = instance[actionMeta.fn](stateContext, action);

// We need to use `isPromise` instead of checking whether
// `result instanceof Promise`. In zone.js patched environments, `global.Promise`
Expand All @@ -365,27 +370,47 @@ export class StateFactory implements OnDestroy {
mergeMap((value: any) => {
if (ɵisPromise(value)) {
return from(value);
}
if (isObservable(value)) {
} else if (isObservable(value)) {
return value;
} else {
return of(value);
}
return of(value);
}),
// If this observable has completed without emitting any values,
// we wouldn't want to complete the entire chain of actions.
// If any observable completes, then the action will be canceled.
// For instance, if any action handler had a statement like
// `handler(ctx) { return EMPTY; }`, then the action would be canceled.
// See https://github.com/ngxs/store/issues/1568
defaultIfEmpty({})
// Note that we actually don't care about the return type; we only care
// about emission, and thus `undefined` is applicable by the framework.
defaultIfEmpty(undefined)
);

if (actionMeta.options.cancelUncompleted) {
result = result.pipe(takeUntil(dispatched$.pipe(ofActionDispatched(action))));
if (cancelable) {
const notifier$ = dispatched$.pipe(ofActionDispatched(action));
result = result.pipe(takeUntil(notifier$));
}

result = result.pipe(
// Note that we use the `finalize` operator only when the action handler
// returns an observable. If the action handler is synchronous, we do not
// need to set the state context functions to `noop`, as the absence of a
// return value indicates no asynchronous functionality. If the handler's
// result is unsubscribed (either because the observable has completed or it
// was unsubscribed by `takeUntil` due to a new action being dispatched),
// we prevent writing to the state context.
finalize(() => {
stateContext.setState = noop;
stateContext.patchState = noop;
})
);
} else {
result = of({}).pipe(shareReplay());
// If the action handler is synchronous and returns nothing (`void`), we
// still have to convert the result to a synchronous observable.
result = of(undefined);
}

return result;
};
});
Expand All @@ -396,3 +421,8 @@ export class StateFactory implements OnDestroy {
}
}
}

// This is used to replace `setState` and `patchState` once the action
// handler has been unsubscribed or completed, to prevent writing
// to the state context.
function noop() {}
22 changes: 22 additions & 0 deletions packages/store/tests/helpers/promise-test-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export function createPromiseTestHelper<T = void>() {
type MarkResolvedFn = (result: T | PromiseLike<T>) => void;
type MarkRejectedFn = (reason?: any) => void;
let resolveFn: MarkResolvedFn = () => {};
let rejectFn: MarkRejectedFn = () => {};

const promise = new Promise<T>((resolve, reject) => {
resolveFn = resolve;
rejectFn = reject;
});
return {
promise,
markPromiseResolved(...args: Parameters<MarkResolvedFn>) {
resolveFn(...args);
resolveFn = () => {};
},
markPromiseRejected(reason?: any) {
rejectFn(reason);
rejectFn = () => {};
}
};
}
121 changes: 121 additions & 0 deletions packages/store/tests/issues/canceling-promises.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { Injectable } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { State, Action, Store, provideStore } from '@ngxs/store';

import { createPromiseTestHelper } from '../helpers/promise-test-helper';

// This test essentially shows that microtasks are not cancelable in JavaScript.
// Therefore, using actions that return promises cannot be used with the
// `cancelUncompleted` option, as this will result in the functionality being
// executed twice (once by the executing action and again by the newly dispatched
// action). Any third-party code that returns promises is also not cancelable.
describe('Canceling promises', () => {
const recorder: string[] = [];

class MyActionAwait {
static readonly type = '[MyState] My action await';
}

class MyActionThen {
static readonly type = '[MyState] My action then';
}

const { promise: promiseAwaitReady, markPromiseResolved: markPromiseAwaitReady } =
createPromiseTestHelper();

const { promise: promiseThenReady, markPromiseResolved: markPromiseThenReady } =
createPromiseTestHelper();

@State<string>({
name: 'myState',
defaults: 'STATE_VALUE'
})
@Injectable()
class MyState {
@Action(MyActionAwait, { cancelUncompleted: true })
async handleActionAwait() {
recorder.push('before promise await ready');
await promiseAwaitReady;
recorder.push('after promise await ready');
}

@Action(MyActionThen, { cancelUncompleted: true })
handleActionThen() {
recorder.push('before promise then ready');
return promiseThenReady.then(() => {
recorder.push('after promise then ready');
});
}
}

beforeEach(() => {
recorder.length = 0;

TestBed.configureTestingModule({
providers: [provideStore([MyState])]
});
});

it('canceling promises using `await`', async () => {
// Arrange
const store = TestBed.inject(Store);

// Act
store.dispatch(new MyActionAwait());

// Assert
expect(recorder).toEqual(['before promise await ready']);

// Act (dispatch another action to cancel the previous one)
// The promise is not resolved yet, as thus `await` is not executed.
store.dispatch(new MyActionAwait());

// Assert
expect(recorder).toEqual(['before promise await ready', 'before promise await ready']);

// Act
markPromiseAwaitReady();
await promiseAwaitReady;

// Assert
expect(recorder).toEqual([
'before promise await ready',
'before promise await ready',
// Note that once the promise is resolved, the await has been executed,
// and both microtasks have also been executed (`recorder.push(...)` is a
// microtask because it is created by `await`).
'after promise await ready',
'after promise await ready'
]);
});

it('canceling promises using `then(...)`', async () => {
// Arrange
const store = TestBed.inject(Store);

// Act
store.dispatch(new MyActionThen());

// Assert
expect(recorder).toEqual(['before promise then ready']);

// Act (dispatch another action to cancel the previous one)
// The promise is not resolved yet, as thus `then(...)` is not executed.
store.dispatch(new MyActionThen());

// Assert
expect(recorder).toEqual(['before promise then ready', 'before promise then ready']);

// Act
markPromiseThenReady();
await promiseThenReady;

// Assert
expect(recorder).toEqual([
'before promise then ready',
'before promise then ready',
'after promise then ready',
'after promise then ready'
]);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { Injectable } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { State, Action, Store, provideStore, StateContext } from '@ngxs/store';

describe('Writing to state after action handler has been unsubscribed', () => {
class Increment {
static readonly type = '[Counter] Increment';
}

@State({
name: 'counter',
defaults: 0
})
@Injectable()
class CounterState {
@Action(Increment, { cancelUncompleted: true })
async handleActionAwait(ctx: StateContext<number>) {
await Promise.resolve();
ctx.setState(counter => counter + 1);
}
}

const testSetup = () => {
TestBed.configureTestingModule({
providers: [provideStore([CounterState])]
});

return TestBed.inject(Store);
};

it('should not write to state if the action has been canceled', async () => {
// Arrange
const store = testSetup();

// Act
store.dispatch(new Increment());
store.dispatch(new Increment());
store.dispatch(new Increment());
await Promise.resolve();

// Assert
expect(store.snapshot()).toEqual({ counter: 1 });
});
});

0 comments on commit 9cf8695

Please sign in to comment.