Skip to content

Commit

Permalink
refactor: clean up Dispatcher code
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Whitfeld committed Apr 27, 2018
1 parent 4216345 commit 8da51af
Showing 1 changed file with 37 additions and 33 deletions.
70 changes: 37 additions & 33 deletions packages/store/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ export class InternalDispatcher {
let result: Observable<any>;

if (Array.isArray(event)) {
result = forkJoin(event.map(a => this._dispatch(a)));
result = forkJoin(event.map(a => this.dispatchSingle(a)));
} else {
result = this._dispatch(event);
result = this.dispatchSingle(event);
}

result.pipe(
Expand All @@ -51,7 +51,7 @@ export class InternalDispatcher {
return result;
}

private _dispatch(action: any): Observable<any> {
private dispatchSingle(action: any): Observable<any> {
const prevState = this._stateStream.getValue();
const plugins = this._pluginManager.plugins;

Expand All @@ -61,37 +61,41 @@ export class InternalDispatcher {
if (nextState !== prevState) {
this._stateStream.next(nextState);
}

const actionResult$ = this._actionResults.pipe(
filter((ctx: ActionContext) => {
return ctx.action === nextAction && ctx.status !== ActionStatus.Dispatched;
}),
take(1),
shareReplay()
);

actionResult$.subscribe(ctx => {
this._actions.next(ctx);
});
this._actions.next({ action: nextAction, status: ActionStatus.Dispatched });

return actionResult$
.pipe(
exhaustMap((ctx: ActionContext) => {
switch (ctx.status) {
case ActionStatus.Completed:
return of(this._stateStream.getValue());
case ActionStatus.Errored:
return of(this._stateStream.getValue()); // This was previously the error value
// I think that this should rather
// return throwError(new Error('the error goes here'))
default:
return empty();
}
})
)
.pipe(shareReplay());
const actionResult$ = this.getActionResultStream(action);
actionResult$.subscribe(ctx => this._actions.next(ctx));
this._actions.next({ action: action, status: ActionStatus.Dispatched });
return this.createDispatchObservable(actionResult$);
}
])(prevState, action) as Observable<any>;
}

private getActionResultStream(action: any): Observable<ActionContext> {
const actionResult$ = this._actionResults.pipe(
filter((ctx: ActionContext) => {
return ctx.action === action && ctx.status !== ActionStatus.Dispatched;
}),
take(1),
shareReplay()
);
return actionResult$;
}

private createDispatchObservable(actionResult$: Observable<ActionContext>): Observable<any> {
return actionResult$
.pipe(
exhaustMap((ctx: ActionContext) => {
switch (ctx.status) {
case ActionStatus.Completed:
return of(this._stateStream.getValue());
case ActionStatus.Errored:
return of(this._stateStream.getValue()); // This was previously the error value
// I think that this should rather
// return throwError(new Error('the error goes here'))
default:
return empty();
}
})
)
.pipe(shareReplay());
}
}

0 comments on commit 8da51af

Please sign in to comment.