Skip to content

Commit

Permalink
Action stream consolidation (#324)
Browse files Browse the repository at this point in the history
* refactor: extract dispatcher class from store

* refactor: decouple dispatcher from action handlers

(PS. 5 failing unit tests at this point)

* fix: ensure actions appear in action stream in correct order

(PS: 17 tests failing now! )
( Seems some plugins may have synchronous order dependencies in their code )

* fix: dispatcher should call action handlers synchronously

The completions will return from within a different task

* fix: dispatcher observable should be called synchronously if possible

The completion events in the action stream will still return asynchronously

* test: fix tests to expected events that at each point in the lifecycle

(2 tests failing at this point. We need a canceled Action Status!)

* fix: canceled action should notify of cancellation

* feat: add ofActionCanceled operator

* fix: correct spelling of cancelled (UK) to canceled (US)

* refactor: extract InternalStateOperations for creation of StateContext

* fix: the Errored status should also be capitalized (issue #313)

* fix: the complete actions should be called before the dispatch subscribe

Introduced an OrderedSubject extension of Subject that forces subscribers to receive values in the order that they were pushed
This allowed us to remove the delay(0) hack

* refactor: clean up Dispatcher code

* refactor: clean up StateFactory code

* docs: add API description for new OrderedSubject class

* fix: the action from the plugins should be used for the action stream
  • Loading branch information
markwhitfeld authored and amcdnl committed Apr 28, 2018
1 parent e080b8c commit 6337a19
Show file tree
Hide file tree
Showing 10 changed files with 445 additions and 182 deletions.
39 changes: 37 additions & 2 deletions packages/store/src/actions-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,54 @@ import { Observable, Subject } from 'rxjs';
export enum ActionStatus {
Dispatched = 'DISPATCHED',
Completed = 'COMPLETED',
Errored = 'Errored'
Canceled = 'CANCELED',
Errored = 'ERRORED'
}

export interface ActionContext {
status: ActionStatus;
action: any;
}

/**
* Custom Subject that ensures that subscribers are notified of values in the order that they arrived.
* A standard Subject does not have this guarantee.
* For example, given the following code:
* ```typescript
* const subject = new Subject<string>();
subject.subscribe(value => {
if (value === 'start') subject.next('end');
});
subject.subscribe(value => { });
subject.next('start');
* ```
* When `subject` is a standard `Subject<T>` the second subscriber would recieve `end` and then `start`.
* When `subject` is a `OrderedSubject<T>` the second subscriber would recieve `start` and then `end`.
*/
export class OrderedSubject<T> extends Subject<T> {
private _itemQueue: T[] = [];
private _busyPushingNext = false;

next(value?: T): void {
if (this._busyPushingNext) {
this._itemQueue.unshift(value);
return;
}
this._busyPushingNext = true;
super.next(value);
while (this._itemQueue.length > 0) {
const nextValue = this._itemQueue.pop();
super.next(nextValue);
}
this._busyPushingNext = false;
}
}

/**
* Internal Action stream that is emitted anytime an action is dispatched.
*/
@Injectable()
export class InternalActions extends Subject<ActionContext> {}
export class InternalActions extends OrderedSubject<ActionContext> {}

/**
* Action stream that is emitted anytime an action is dispatched.
Expand Down
101 changes: 101 additions & 0 deletions packages/store/src/dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { Injectable, ErrorHandler } from '@angular/core';
import { Observable, of, forkJoin, empty, Subject } from 'rxjs';
import { catchError, shareReplay, filter, exhaustMap, take } from 'rxjs/operators';

import { compose } from './compose';
import { InternalActions, ActionStatus, ActionContext } from './actions-stream';
import { StateStream } from './state-stream';
import { PluginManager } from './plugin-manager';

/**
* Internal Action result stream that is emitted when an action is completed.
* This is used as a method of returning the action result to the dispatcher
* for the observable returned by the dispatch(...) call.
* The dispatcher then asynchronously pushes the result from this stream onto the main action stream as a result.
*/
@Injectable()
export class InternalDispatchedActionResults extends Subject<ActionContext> {}

@Injectable()
export class InternalDispatcher {
constructor(
private _errorHandler: ErrorHandler,
private _actions: InternalActions,
private _actionResults: InternalDispatchedActionResults,
private _pluginManager: PluginManager,
private _stateStream: StateStream
) {}

/**
* Dispatches event(s).
*/
dispatch(event: any | any[]): Observable<any> {
let result: Observable<any>;

if (Array.isArray(event)) {
result = forkJoin(event.map(a => this.dispatchSingle(a)));
} else {
result = this.dispatchSingle(event);
}

result.pipe(
catchError(err => {
// handle error through angular error system
this._errorHandler.handleError(err);
return of(err);
})
);

result.subscribe();

return result;
}

private dispatchSingle(action: any): Observable<any> {
const prevState = this._stateStream.getValue();
const plugins = this._pluginManager.plugins;

return compose([
...plugins,
(nextState, nextAction) => {
if (nextState !== prevState) {
this._stateStream.next(nextState);
}
const actionResult$ = this.getActionResultStream(nextAction);
actionResult$.subscribe(ctx => this._actions.next(ctx));
this._actions.next({ action: nextAction, status: ActionStatus.Dispatched });
return this.createDispatchObservable(actionResult$);
}
])(prevState, action) as Observable<any>;
}

private getActionResultStream(action: any): Observable<ActionContext> {
const actionResult$ = this._actionResults.pipe(
filter((ctx: ActionContext) => {
return ctx.action === action && ctx.status !== ActionStatus.Dispatched;
}),
take(1),
shareReplay()
);
return actionResult$;
}

private createDispatchObservable(actionResult$: Observable<ActionContext>): Observable<any> {
return actionResult$
.pipe(
exhaustMap((ctx: ActionContext) => {
switch (ctx.status) {
case ActionStatus.Completed:
return of(this._stateStream.getValue());
case ActionStatus.Errored:
return of(this._stateStream.getValue()); // This was previously the error value
// I think that this should rather
// return throwError(new Error('the error goes here'))
default:
return empty();
}
})
)
.pipe(shareReplay());
}
}
8 changes: 5 additions & 3 deletions packages/store/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ export interface ActionHandlerMetaData {
type: string;
}

export type GetStateFn<T> = () => T;
export type SetStateFn<T> = (newState: T) => void;
export type DispatchFn = (actions: any | any[]) => Observable<any>;
export interface InternalStateOperations<T> {
getState(): T;
setState(val: T);
dispatch(actions: any | any[]): Observable<void>;
}

export interface MetaDataModel {
name: string;
Expand Down
33 changes: 15 additions & 18 deletions packages/store/src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { NgModule, ModuleWithProviders, Optional, Inject } from '@angular/core';
import { ROOT_STATE_TOKEN, FEATURE_STATE_TOKEN } from './symbols';
import { StateFactory } from './state-factory';
import { Actions, InternalActions } from './actions-stream';
import { InternalDispatcher, InternalDispatchedActionResults } from './dispatcher';
import { Store } from './store';
import { SelectFactory } from './select';
import { StateStream } from './state-stream';
Expand Down Expand Up @@ -35,16 +36,13 @@ export class NgxsRootModule {
stateStream.next({ ...cur, ...results.defaults });
}

store.dispatch(new InitState());
factory.connectActionHandlers(results.states);

if (results) {
factory.invokeInit(
() => stateStream.getValue(),
newState => stateStream.next(newState),
actions => store.dispatch(actions),
results.states
);
}
store.dispatch(new InitState()).subscribe(() => {
if (results) {
factory.invokeInit(results.states);
}
});
}
}

Expand Down Expand Up @@ -76,16 +74,13 @@ export class NgxsFeatureModule {
stateStream.next({ ...cur, ...results.defaults });
}

store.dispatch(new UpdateState());
factory.connectActionHandlers(results.states);

if (results) {
factory.invokeInit(
() => stateStream.getValue(),
newState => stateStream.next(newState),
actions => store.dispatch(actions),
results.states
);
}
store.dispatch(new UpdateState()).subscribe(() => {
if (results) {
factory.invokeInit(results.states);
}
});
}
}

Expand All @@ -104,6 +99,8 @@ export class NgxsModule {
StateFactory,
Actions,
InternalActions,
InternalDispatcher,
InternalDispatchedActionResults,
Store,
StateStream,
SelectFactory,
Expand Down
9 changes: 9 additions & 0 deletions packages/store/src/of-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ export function ofActionCompleted(...allowedTypes: any[]) {
return ofActionOperator(allowedTypes, ActionStatus.Completed);
}

/**
* RxJS operator for selecting out specific actions.
*
* This will ONLY grab actions that have just been completed
*/
export function ofActionCanceled(...allowedTypes: any[]) {
return ofActionOperator(allowedTypes, ActionStatus.Canceled);
}

/**
* RxJS operator for selecting out specific actions.
*
Expand Down
Loading

0 comments on commit 6337a19

Please sign in to comment.