Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Action stream consolidation #324

Merged
merged 17 commits into from
Apr 28, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions packages/store/src/actions-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,39 @@ import { Observable, Subject } from 'rxjs';
export enum ActionStatus {
Dispatched = 'DISPATCHED',
Completed = 'COMPLETED',
Errored = 'Errored'
Canceled = 'CANCELED',
Errored = 'ERRORED'
}

export interface ActionContext {
status: ActionStatus;
action: any;
}

export class OrderedSubject<T> extends Subject<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deebloo I think you might like this.

private _itemQueue: T[] = [];
private _busyPushingNext = false;

next(value?: T): void {
if (this._busyPushingNext) {
this._itemQueue.unshift(value);
return;
}
this._busyPushingNext = true;
super.next(value);
while (this._itemQueue.length > 0) {
const nextValue = this._itemQueue.pop();
super.next(nextValue);
}
this._busyPushingNext = false;
}
}

/**
* Internal Action stream that is emitted anytime an action is dispatched.
*/
@Injectable()
export class InternalActions extends Subject<ActionContext> {}
export class InternalActions extends OrderedSubject<ActionContext> {}

/**
* Action stream that is emitted anytime an action is dispatched.
Expand Down
97 changes: 97 additions & 0 deletions packages/store/src/dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { Injectable, ErrorHandler } from '@angular/core';
import { Observable, of, forkJoin, empty, Subject } from 'rxjs';
import { catchError, shareReplay, filter, exhaustMap, take } from 'rxjs/operators';

import { compose } from './compose';
import { InternalActions, ActionStatus, ActionContext } from './actions-stream';
import { StateStream } from './state-stream';
import { PluginManager } from './plugin-manager';

/**
* Internal Action result stream that is emitted when an action is completed.
* This is used as a method of returning the action result to the dispatcher
* for the observable returned by the dispatch(...) call.
* The dispatcher then asynchronously pushes the result from this stream onto the main action stream as a result.
*/
@Injectable()
export class InternalDispatchedActionResults extends Subject<ActionContext> {}

@Injectable()
export class InternalDispatcher {
constructor(
private _errorHandler: ErrorHandler,
private _actions: InternalActions,
private _actionResults: InternalDispatchedActionResults,
private _pluginManager: PluginManager,
private _stateStream: StateStream
) {}

/**
* Dispatches event(s).
*/
dispatch(event: any | any[]): Observable<any> {
let result: Observable<any>;

if (Array.isArray(event)) {
result = forkJoin(event.map(a => this._dispatch(a)));
} else {
result = this._dispatch(event);
}

result.pipe(
catchError(err => {
// handle error through angular error system
this._errorHandler.handleError(err);
return of(err);
})
);

result.subscribe();

return result;
}

private _dispatch(action: any): Observable<any> {
const prevState = this._stateStream.getValue();
const plugins = this._pluginManager.plugins;

return compose([
...plugins,
(nextState, nextAction) => {
if (nextState !== prevState) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the complexity of this function now, we should break it out into its own method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this._stateStream.next(nextState);
}

const actionResult$ = this._actionResults.pipe(
filter((ctx: ActionContext) => {
return ctx.action === nextAction && ctx.status !== ActionStatus.Dispatched;
}),
take(1),
shareReplay()
);

actionResult$.subscribe(ctx => {
this._actions.next(ctx);
});
this._actions.next({ action: nextAction, status: ActionStatus.Dispatched });

return actionResult$
.pipe(
exhaustMap((ctx: ActionContext) => {
switch (ctx.status) {
case ActionStatus.Completed:
return of(this._stateStream.getValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the subscribe result should not return a value. The reason is was before was the value is needed for the plugins next().subscribe(state). Not sure how we can make the dispatch().subscribe() not return anything but make the next().subscribe(state=> work too. cc/ @deebloo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amcdnl yeah we would have to change plugins. Which I am certainly open to

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that I will have to leave it like this for now so that I don't break the plugins.

case ActionStatus.Errored:
return of(this._stateStream.getValue()); // This was previously the error value
// I think that this should rather
// return throwError(new Error('the error goes here'))
default:
return empty();
}
})
)
.pipe(shareReplay());
}
])(prevState, action) as Observable<any>;
}
}
8 changes: 5 additions & 3 deletions packages/store/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ export interface ActionHandlerMetaData {
type: string;
}

export type GetStateFn<T> = () => T;
export type SetStateFn<T> = (newState: T) => void;
export type DispatchFn = (actions: any | any[]) => Observable<any>;
export interface InternalStateOperations<T> {
getState(): T;
setState(val: T);
dispatch(actions: any | any[]): Observable<void>;
}

export interface MetaDataModel {
name: string;
Expand Down
33 changes: 15 additions & 18 deletions packages/store/src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { NgModule, ModuleWithProviders, Optional, Inject } from '@angular/core';
import { ROOT_STATE_TOKEN, FEATURE_STATE_TOKEN } from './symbols';
import { StateFactory } from './state-factory';
import { Actions, InternalActions } from './actions-stream';
import { InternalDispatcher, InternalDispatchedActionResults } from './dispatcher';
import { Store } from './store';
import { SelectFactory } from './select';
import { StateStream } from './state-stream';
Expand Down Expand Up @@ -34,16 +35,13 @@ export class NgxsRootModule {
stateStream.next({ ...cur, ...results.defaults });
}

store.dispatch(new InitState());
factory.connectActionHandlers(results.states);

if (results) {
factory.invokeInit(
() => stateStream.getValue(),
newState => stateStream.next(newState),
actions => store.dispatch(actions),
results.states
);
}
store.dispatch(new InitState()).subscribe(() => {
if (results) {
factory.invokeInit(results.states);
}
});
}
}

Expand Down Expand Up @@ -75,16 +73,13 @@ export class NgxsFeatureModule {
stateStream.next({ ...cur, ...results.defaults });
}

store.dispatch(new UpdateState());
factory.connectActionHandlers(results.states);

if (results) {
factory.invokeInit(
() => stateStream.getValue(),
newState => stateStream.next(newState),
actions => store.dispatch(actions),
results.states
);
}
store.dispatch(new UpdateState()).subscribe(() => {
if (results) {
factory.invokeInit(results.states);
}
});
}
}

Expand All @@ -103,6 +98,8 @@ export class NgxsModule {
StateFactory,
Actions,
InternalActions,
InternalDispatcher,
InternalDispatchedActionResults,
Store,
StateStream,
SelectFactory,
Expand Down
9 changes: 9 additions & 0 deletions packages/store/src/of-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ export function ofActionCompleted(...allowedTypes: any[]) {
return ofActionOperator(allowedTypes, ActionStatus.Completed);
}

/**
* RxJS operator for selecting out specific actions.
*
* This will ONLY grab actions that have just been completed
*/
export function ofActionCanceled(...allowedTypes: any[]) {
return ofActionOperator(allowedTypes, ActionStatus.Canceled);
}

/**
* RxJS operator for selecting out specific actions.
*
Expand Down
Loading