From 04643c35e157d9e876682a07c8576d212226e1d2 Mon Sep 17 00:00:00 2001 From: Mark Whitfeld Date: Fri, 27 Apr 2018 21:53:44 +0200 Subject: [PATCH] refactor: clean up Dispatcher code --- packages/store/src/dispatcher.ts | 70 +++++++++++++++++--------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/packages/store/src/dispatcher.ts b/packages/store/src/dispatcher.ts index 98bdc9133..a436e0cf4 100644 --- a/packages/store/src/dispatcher.ts +++ b/packages/store/src/dispatcher.ts @@ -33,9 +33,9 @@ export class InternalDispatcher { let result: Observable; if (Array.isArray(event)) { - result = forkJoin(event.map(a => this._dispatch(a))); + result = forkJoin(event.map(a => this.dispatchSingle(a))); } else { - result = this._dispatch(event); + result = this.dispatchSingle(event); } result.pipe( @@ -51,7 +51,7 @@ export class InternalDispatcher { return result; } - private _dispatch(action: any): Observable { + private dispatchSingle(action: any): Observable { const prevState = this._stateStream.getValue(); const plugins = this._pluginManager.plugins; @@ -61,37 +61,41 @@ export class InternalDispatcher { if (nextState !== prevState) { this._stateStream.next(nextState); } - - const actionResult$ = this._actionResults.pipe( - filter((ctx: ActionContext) => { - return ctx.action === nextAction && ctx.status !== ActionStatus.Dispatched; - }), - take(1), - shareReplay() - ); - - actionResult$.subscribe(ctx => { - this._actions.next(ctx); - }); - this._actions.next({ action: nextAction, status: ActionStatus.Dispatched }); - - return actionResult$ - .pipe( - exhaustMap((ctx: ActionContext) => { - switch (ctx.status) { - case ActionStatus.Completed: - return of(this._stateStream.getValue()); - case ActionStatus.Errored: - return of(this._stateStream.getValue()); // This was previously the error value - // I think that this should rather - // return throwError(new Error('the error goes here')) - default: - return empty(); - } - }) - ) - .pipe(shareReplay()); + const actionResult$ = this.getActionResultStream(action); + actionResult$.subscribe(ctx => this._actions.next(ctx)); + this._actions.next({ action: action, status: ActionStatus.Dispatched }); + return this.createDispatchObservable(actionResult$); } ])(prevState, action) as Observable; } + + private getActionResultStream(action: any): Observable { + const actionResult$ = this._actionResults.pipe( + filter((ctx: ActionContext) => { + return ctx.action === action && ctx.status !== ActionStatus.Dispatched; + }), + take(1), + shareReplay() + ); + return actionResult$; + } + + private createDispatchObservable(actionResult$: Observable): Observable { + return actionResult$ + .pipe( + exhaustMap((ctx: ActionContext) => { + switch (ctx.status) { + case ActionStatus.Completed: + return of(this._stateStream.getValue()); + case ActionStatus.Errored: + return of(this._stateStream.getValue()); // This was previously the error value + // I think that this should rather + // return throwError(new Error('the error goes here')) + default: + return empty(); + } + }) + ) + .pipe(shareReplay()); + } }