From 5874f25785008370d289f8d9f6980df90930c536 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Mon, 25 Nov 2013 16:32:56 +0100 Subject: [PATCH 1/2] fix prefix in streams by using a streams2 transform. #42 --- package.json | 6 ++++-- sub.js | 48 +++++++++++++++--------------------------------- 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/package.json b/package.json index 90a224b..9bd078a 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,8 @@ "level-hooks": ">=4.4.0 <5", "string-range": "~1.2.1", "level-fix-range": "2.0", - "xtend": "~2.0.4" + "xtend": "~2.0.4", + "readable-stream": "~1.1.9" }, "devDependencies": { "rimraf": "~2.1.4", @@ -23,7 +24,8 @@ "stream-to-pull-stream": "~1.2.0", "tape": "~1.0.4", "through": "~2.3.4", - "level": "~0.15.0" + "level": "~0.15.0", + "stream-to": "~0.3.1" }, "scripts": { "test": "set -e; for t in test/*.js; do node $t; done" diff --git a/sub.js b/sub.js index 71dbc37..80fb788 100644 --- a/sub.js +++ b/sub.js @@ -3,6 +3,7 @@ var inherits = require('util').inherits var ranges = require('string-range') var fixRange = require('level-fix-range') var xtend = require('xtend') +var Transform = require('readable-stream').Transform inherits(SubDB, EventEmitter) @@ -156,44 +157,25 @@ SDB.createReadStream = function (opts) { selectivelyMerge(_opts, xtend(opts, this._options)) var s = r.createReadStream(_opts) + if (_opts.keys === false) return s + + var tr = Transform({ objectMode: true }) + s.on('error', tr.emit.bind(tr, 'error')) + s.pipe(tr) if(_opts.values === false) { - var read = s.read - if (read) { - s.read = function (size) { - var val = read.call(this, size) - if (val) val = val.substring(p.length) - return val - } - } else { - var emit = s.emit - s.emit = function (event, val) { - if(event === 'data') { - emit.call(this, 'data', val.substring(p.length)) - } else - emit.call(this, event, val) - } + tr._transform = function (key, enc, done) { + key = key.substring(p.length) + done(null, key) } - return s - } else if(_opts.keys === false) - return s - else { - var read = s.read - if (read) { - s.read = function (size) { - var d = read.call(this, size) - if (d) d.key = d.key.substring(p.length) - return d - } - } else { - s.on('data', function (d) { - //mutate the prefix! - //this doesn't work for createKeyStream admittedly. - d.key = d.key.substring(p.length) - }) + } else { + tr._transform = function (data, enc, done) { + data.key = data.key.substring(p.length) + done(null, data) } - return s } + + return tr } From a4f07bf415b6a9ad10ab59dd731a9e449ea661bf Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Mon, 25 Nov 2013 16:34:47 +0100 Subject: [PATCH 2/2] fix key-value-stream test by using stream-to --- test/key-value-stream.js | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/test/key-value-stream.js b/test/key-value-stream.js index 3e967c0..bb631d2 100644 --- a/test/key-value-stream.js +++ b/test/key-value-stream.js @@ -1,7 +1,7 @@ -var pl = require('pull-level') -var pull = require('pull-stream') -var toPull = require('stream-to-pull-stream') +var pl = require('pull-level') +var pull = require('pull-stream') +var streamTo = require('stream-to'); var level = require('level-test')() var sublevel = require('../') @@ -21,25 +21,23 @@ tape('keys', function (t) { throw err } - toPull(db.createKeyStream()) - .pipe(pull.collect(function (err, ary) { + streamTo.array(db.createKeyStream(), function (err, ary) { + console.log(ary) + ary.forEach(function (e) { + t.equal(typeof e, 'string') + t.ok(/^key_/.test(e)) + }) + streamTo.array(db.createValueStream(), function (err, ary) { console.log(ary) ary.forEach(function (e) { t.equal(typeof e, 'string') - t.ok(/^key_/.test(e)) + t.ok(/^value_/.test(e)) + console.log(e) }) - toPull(db.createValueStream()) - .pipe(pull.collect(function (err, ary) { - console.log(ary) - ary.forEach(function (e) { - t.equal(typeof e, 'string') - t.ok(/^value_/.test(e)) - console.log(e) - }) - - t.end() - })) - })) + + t.end() + }) + }) })) })