From d1ade9271203d9c591db3dd39ef3e59fbf421061 Mon Sep 17 00:00:00 2001 From: Jeremiah Senkpiel Date: Thu, 6 Jun 2019 16:36:23 -0700 Subject: [PATCH] helpers: introduce Stream() Refs: https://github.com/Fishrock123/bob/issues/26 --- helpers/stream.js | 53 ++++++++++++++++++++++++++++++++++++ tests/file-to-file-test.js | 16 +++++++---- tests/zlib-transform-test.js | 18 +++++++----- 3 files changed, 74 insertions(+), 13 deletions(-) create mode 100644 helpers/stream.js diff --git a/helpers/stream.js b/helpers/stream.js new file mode 100644 index 0000000..9efa273 --- /dev/null +++ b/helpers/stream.js @@ -0,0 +1,53 @@ +'use strict' + +class Stream { + #source = null + #sink = null + #promise = null + #resolve = null + #reject = null + + constructor(source, ...sinks) { + this.#source = source + this.#sink = sinks[sinks.length - 1] + + let last = source + for (const sink of sinks.slice(0, sinks.length - 1)) { + sink.bindSource(last) + last = sink + } + + this.#sink.bindSource(last, error => { + if (this.#promise === null) { + throw error + } + if (error) { + this.#reject(error) + } else { + this.#resolve() + } + }) + } + + start() { + // If sink is undefined or does not have start(), a programmer error has been made. + return this.sink.start() + } + + stop() { + // If source is undefined or does not have stop(), a programmer error has been made. + return this.source.stop() + } + + then(_resolve, _reject) { + if (!this.#promise) { + this.#promise = new Promise((resolve, reject) => { + this.#resolve = resolve + this.#reject = reject + }) + } + return this.#promise.then(_resolve, _reject) + } +} + +module.exports = Stream diff --git a/tests/file-to-file-test.js b/tests/file-to-file-test.js index 0b185d7..9e31113 100644 --- a/tests/file-to-file-test.js +++ b/tests/file-to-file-test.js @@ -2,6 +2,7 @@ // node --expose-internals file-to-file-test.js ./fixtures/test +const Stream = require('../helpers/stream') const FileSource = require('fs-source') const FileSink = require('fs-sink') const PassThrough = require('../reference-passthrough') @@ -10,10 +11,13 @@ const fileSource = new FileSource(process.argv[2]) const fileSink = new FileSink(process.argv[2] + '_') const passThrough = new PassThrough() -fileSink.bindSource(passThrough.bindSource(fileSource), error => { - if (error) - console.error('ERROR!', error) - else { - console.log('done') - } +const stream = new Stream(fileSource, passThrough, fileSink) +try { + stream.start() +} catch (e) {} + +stream.then(resolved => { + console.log('done (resolved)') +}, rejected => { + console.error('ERROR! (rejected)', rejected) }) diff --git a/tests/zlib-transform-test.js b/tests/zlib-transform-test.js index 2ee3dd2..1ad5af8 100644 --- a/tests/zlib-transform-test.js +++ b/tests/zlib-transform-test.js @@ -4,6 +4,7 @@ const zlib = require('zlib') +const Stream = require('../helpers/stream') const FileSource = require('fs-source') const FileSink = require('fs-sink') const ZlibTransform = require('zlib-transform') @@ -12,11 +13,14 @@ const fileSource = new FileSource(process.argv[2]) const fileSink = new FileSink(process.argv[2] + '.gz') const zlibTransform = new ZlibTransform({}, zlib.constants.GZIP) -fileSink.bindSource(zlibTransform.bindSource(fileSource), error => { - if (error) { - console.error('ERROR!', error) - console.error((new Error()).stack) - } else { - console.log('done') - } +const stream = new Stream(fileSource, zlibTransform, fileSink) +try { + stream.start() +} catch (e) {} + +stream.then(resolved => { + console.log('done (resolved)') +}, rejected => { + console.error('ERROR! (rejected)', rejected) + console.error((new Error()).stack) })