diff --git a/package.json b/package.json index ee4de4d..95308d2 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ }, "license": "MIT", "peerDependencies": { - "most": "*" + "@most/core": "*" }, "devDependencies": { "babel-cli": "^6.7.5", @@ -36,8 +36,7 @@ "eslint": "^3.19.0", "mkdirp": "^0.5.1", "mocha": "^3.3.0", - "most": "*", - "most-test": "^1.3.0", + "@most/core": "*", "power-assert": "^1.4.1", "rimraf": "^2.6.1" } diff --git a/src/index.js b/src/index.js index 232e0df..c097eb8 100644 --- a/src/index.js +++ b/src/index.js @@ -5,6 +5,7 @@ If not supplied, `f` defaults to an identity function, i.e. x => x */ +const {newStream} = require('@most/core'); import DispatchSource from './source'; function dispatch(f, stream) { @@ -13,9 +14,9 @@ function dispatch(f, stream) { return stream; } const source = new DispatchSource(stream, f); - const newStream = new stream.constructor(source); - newStream.select = key => source.select(key); - return newStream; + const streamNew = newStream(source.run.bind(source)); + streamNew.select = key => source.select(key); + return streamNew; }; return stream ? dispatcher(stream) : dispatcher; } diff --git a/src/pipe.js b/src/pipe.js new file mode 100644 index 0000000..bdc10ff --- /dev/null +++ b/src/pipe.js @@ -0,0 +1,17 @@ +export class Pipe { + constructor(sink) { + this.sink = sink; + } + + event(t, x) { + return this.sink.event(t, x); + } + + end(t) { + return this.sink.end(t); + } + + error(t, e) { + return this.sink.error(t, e); + } +} diff --git a/src/source.js b/src/source.js index d40b693..8d8ed9f 100644 --- a/src/source.js +++ b/src/source.js @@ -1,4 +1,6 @@ -import Pipe from 'most/lib/sink/Pipe'; +const {newStream} = require('@most/core'); + +import {Pipe} from './pipe'; import {DispatchDisposable, emptyDisposable, dispose} from './dispose'; import {Store} from './store'; import {tryEvent, tryEnd} from './try'; @@ -41,12 +43,12 @@ export default class DispatchSource { select(key, initial) { const source = new TargetSource(this, key); - return new this.stream.constructor(source); + return newStream(source.run.bind(source)); } add(sink, scheduler, key) { if(this._store.add(key, sink)) { - this._disposable = this.stream.source.run(this, scheduler); + this._disposable = this.stream.run(this, scheduler); } return new DispatchDisposable(this, sink, key); } diff --git a/test/helpers/reduce.js b/test/helpers/reduce.js new file mode 100644 index 0000000..86606bd --- /dev/null +++ b/test/helpers/reduce.js @@ -0,0 +1,40 @@ +/** @license MIT License (c) copyright 2010-2016 original author or authors */ +/** @author Brian Cavalier */ +/** @author John Hann */ + +const {run} = require('@most/core'); +// import { tap } from '../../src/combinator/transform' +import {newDefaultScheduler} from '@most/scheduler'; + +/** + * Reduce a stream to produce a single result. Note that reducing an infinite + * stream will return a Promise that never fulfills, but that may reject if an error + * occurs. + * @param {function(result:*, x:*):*} f reducer function + * @param {*} initial initial value + * @param {Stream} stream to reduce + * @returns {Promise} promise for the final result of the reduce + */ +export function reduce(f, initial, stream) { + return new Promise((resolve, reject) => { + run(new ReduceSink(f, initial, resolve, reject), newDefaultScheduler(), stream); + }); +} + +class ReduceSink { + constructor(f, value, resolve, reject) { + this.f = f; + this.value = value; + this.resolve = resolve; + this.reject = reject; + } + event(t, x) { + this.value = this.f(this.value, x); + } + error(t, e) { + this.reject(e); + } + end(t) { + this.resolve(this.value); + } +} diff --git a/test/helpers/stream-helper.js b/test/helpers/stream-helper.js new file mode 100644 index 0000000..bc2ab9b --- /dev/null +++ b/test/helpers/stream-helper.js @@ -0,0 +1,21 @@ +import {eq} from '@briancavalier/assert'; +import {reduce} from './reduce'; + +export function assertSame(s1, s2) { + return Promise.all([toArray(s1), toArray(s2)]).then(arrayEquals); +} + +export function expectArray(array, s) { + return toArray(s).then(eq(array)); +} + +function toArray(s) { + return reduce(function(a, x) { + a.push(x); + return a; + }, [], s); +} + +function arrayEquals(ss) { + eq(ss[0], ss[1]); +} diff --git a/test/test-dispatch.js b/test/test-dispatch.js index 6ca68c6..7ca4ce7 100644 --- a/test/test-dispatch.js +++ b/test/test-dispatch.js @@ -1,6 +1,11 @@ const assert = require('power-assert'); const {dispatch} = require('../src'); -const {Stream, empty, from, filter, merge, loop, map, join, reduce} = require('most'); +const {Empty, empty, withItems, periodic, filter, merge, loop, map, join} = require('@most/core'); +const {reduce} = require('./helpers/reduce'); + +function from(inputs) { + return withItems(inputs)(periodic(0)); +} describe('[most-dispatch]', () => { describe('dispatch()', () => { @@ -16,16 +21,24 @@ describe('[most-dispatch]', () => { const a$ = empty(); const b$ = d(a$); - assert(b$ instanceof Stream); + assert("run" in b$); }); + const arraySink = () => { + const result = []; + return { + error: (t, err) => undefined, + event: (t, evt) => console.log(evt) && result.push(evt), + end: (t) => undefined, + result + }; + }; it('emits the input values combined as a tuple with a selector function', () => { const d = dispatch(x => x); const inputs = [3, 5, 7]; const a$ = from(inputs); const b$ = d(a$); - return b$ - .reduce((arr, x) => (arr.push(x), arr), []) + return reduce((arr, x) => (arr.push(x), arr), [], b$) .then(values => { assert(values.length === 3); values.forEach((v, i) => { @@ -50,11 +63,10 @@ describe('[most-dispatch]', () => { ]; const a$ = from(inputs); const b$ = d(a$); - const c$ = b$.select(3).map(x => ({p: x.b})); - const d$ = b$.select(5).map(x => ({q: x.b})); + const c$ = map(x => ({p: x.b}), b$.select(3)); + const d$ = map(x => ({q: x.b}), b$.select(5)); const e$ = merge(c$, d$); - return e$ - .reduce((arr, x) => (arr.push(x), arr), []) + return reduce((arr, x) => (arr.push(x), arr), [], e$) .then(values => { assert.deepEqual(values, [ {p: 100}, @@ -105,4 +117,4 @@ describe('[most-dispatch]', () => { }); }); }); -}); \ No newline at end of file +}); diff --git a/yarn.lock b/yarn.lock index 39fef12..9787ba9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2,21 +2,41 @@ # yarn lockfile v1 -"@most/hold@^2.0.0": - version "2.0.0" - resolved "https://registry.yarnpkg.com/@most/hold/-/hold-2.0.0.tgz#8e2328f01fcf5e08ff93343567e0d3dd50fcc2e2" +"@most/core@*": + version "1.5.0" + resolved "https://registry.yarnpkg.com/@most/core/-/core-1.5.0.tgz#b36b3620724e37ac3552659c1d71948958beb345" + integrity sha512-cI8KTkjaipRVQvf4vrwhqNtY7cTzF9YaI85TU6mHEgjRMeTSS9qgBv6z6wXUePgnbwf11uVgSERZ8PZRDPE2HA== dependencies: - "@most/multicast" "^1.2.4" + "@most/disposable" "^1.2.2" + "@most/prelude" "^1.7.3" + "@most/scheduler" "^1.2.3" + "@most/types" "^1.0.2" -"@most/multicast@^1.2.4", "@most/multicast@^1.2.5": - version "1.2.5" - resolved "https://registry.yarnpkg.com/@most/multicast/-/multicast-1.2.5.tgz#ba5abc997f9a6511094bec117914f4959720a8fb" +"@most/disposable@^1.2.2": + version "1.2.2" + resolved "https://registry.yarnpkg.com/@most/disposable/-/disposable-1.2.2.tgz#1f9dfcc1b9d73165436eee34ddedc2e70d7be695" + integrity sha512-05u3obo0sDuh1PGw1BS6VEhgJTLkPMyGHOAvSJMSk2tArgBvhMvjayl8fU4DEKO7r+3aTg7F9Stnr1T61578yQ== dependencies: - "@most/prelude" "^1.4.0" + "@most/prelude" "^1.7.3" + "@most/types" "^1.0.2" -"@most/prelude@^1.4.0": - version "1.6.0" - resolved "https://registry.yarnpkg.com/@most/prelude/-/prelude-1.6.0.tgz#4256e3a902ddf04c1f07afca2267526195072e13" +"@most/prelude@^1.7.3": + version "1.7.3" + resolved "https://registry.yarnpkg.com/@most/prelude/-/prelude-1.7.3.tgz#51db3f3ba3ed65431b6eea89ecb0a31826af640c" + integrity sha512-qWWEnA22UP1lzFfKx75XMut6DUUXGRKe7qv2k+Bgs7ju8lwb5RjsZYyQZ+VcsYvHcIavHKzseLlBMLOe2CvUZw== + +"@most/scheduler@^1.2.3": + version "1.2.3" + resolved "https://registry.yarnpkg.com/@most/scheduler/-/scheduler-1.2.3.tgz#7cb97904a23bbfcb2664c60ea4fe047d7b5572f7" + integrity sha512-OpykYNwUIe7/InAs0ftSIkQULJajM6ghmYDeJ0yv9xcLmi5NyAtOOyUdIxlfqi7xHWRiD2iQuuJXHGyqFsKCsw== + dependencies: + "@most/prelude" "^1.7.3" + "@most/types" "^1.0.2" + +"@most/types@^1.0.2": + version "1.0.2" + resolved "https://registry.yarnpkg.com/@most/types/-/types-1.0.2.tgz#a272c919a3dafe942bd02d63402f0548593af870" + integrity sha512-ZVkDwaiuGVTXywADeJ3aUBsD41UURldIljlsSTMdiWfhVEOY7dsY8iq+9wFUjRZYjhnpn9tLiOHaqyjbxm1prg== abbrev@1: version "1.1.0" @@ -1436,18 +1456,6 @@ mocha@^3.3.0: mkdirp "0.5.1" supports-color "3.1.2" -most-test@^1.3.0: - version "1.3.0" - resolved "https://registry.yarnpkg.com/most-test/-/most-test-1.3.0.tgz#22b73c68329d487d78e11ad95451b38cf8a77c13" - -most@*: - version "1.3.0" - resolved "https://registry.yarnpkg.com/most/-/most-1.3.0.tgz#148f96c311ce26cace63a179d10dd61dacee58f4" - dependencies: - "@most/multicast" "^1.2.5" - "@most/prelude" "^1.4.0" - symbol-observable "^1.0.2" - ms@0.7.1: version "0.7.1" resolved "https://registry.yarnpkg.com/ms/-/ms-0.7.1.tgz#9cd13c03adbff25b65effde7ce864ee952017098" @@ -2005,10 +2013,6 @@ supports-color@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/supports-color/-/supports-color-2.0.0.tgz#535d045ce6b6363fa40117084629995e9df324c7" -symbol-observable@^1.0.2: - version "1.0.4" - resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.4.tgz#29bf615d4aa7121bdd898b22d4b3f9bc4e2aa03d" - table@^3.7.8: version "3.8.3" resolved "https://registry.yarnpkg.com/table/-/table-3.8.3.tgz#2bbc542f0fda9861a755d3947fefd8b3f513855f"