Skip to content

Commit

Permalink
WiP: filter().
Browse files Browse the repository at this point in the history
  • Loading branch information
uhop committed Dec 11, 2024
1 parent 7dd3581 commit 3c82145
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 101 deletions.
134 changes: 33 additions & 101 deletions src/filters/filter.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,131 +2,63 @@

'use strict';

const FilterBase = require('./filter-base');
const withParser = require('../utils/with-parser');
const {many} = require('stream-chain');

class Filter extends FilterBase {
static make(options) {
return new Filter(options);
}

static withParser(options) {
return withParser(Filter.make, options);
}
const filterBase = require('./filter-base.js');
const withParser = require('../utils/with-parser.js');

constructor(options) {
super(options);
this._once = false;
this._lastStack = [];
}

_flush(callback) {
this._syncStack();
callback(null);
}

_checkChunk(chunk) {
switch (chunk.name) {
case 'startObject':
if (this._filter(this._stack, chunk)) {
this._syncStack();
this.push(chunk);
this._lastStack.push(null);
}
break;
case 'startArray':
if (this._filter(this._stack, chunk)) {
this._syncStack();
this.push(chunk);
this._lastStack.push(-1);
}
break;
case 'nullValue':
case 'trueValue':
case 'falseValue':
case 'stringValue':
case 'numberValue':
if (this._filter(this._stack, chunk)) {
this._syncStack();
this.push(chunk);
}
break;
case 'startString':
if (this._filter(this._stack, chunk)) {
this._syncStack();
this.push(chunk);
this._transform = this._passString;
} else {
this._transform = this._skipString;
}
break;
case 'startNumber':
if (this._filter(this._stack, chunk)) {
this._syncStack();
this.push(chunk);
this._transform = this._passNumber;
} else {
this._transform = this._skipNumber;
}
break;
}
return false;
}

_syncStack() {
const stack = this._stack,
last = this._lastStack,
stackLength = stack.length,
lastLength = last.length;
const filter = filterBase({
checkAlways: true,
transition(previousStack, stack, chunk, options) {
const returnTokens = [];

// find the common part
let commonLength = 0;
for (const n = Math.min(stackLength, lastLength); commonLength < n && stack[commonLength] === last[commonLength]; ++commonLength);
for (const n = Math.min(stack.length, previousStack.length); commonLength < n && stack[commonLength] === previousStack[commonLength]; ++commonLength);

// close old objects
for (let i = lastLength - 1; i > commonLength; --i) {
this.push({name: typeof last[i] == 'number' ? 'endArray' : 'endObject'});
for (let i = previousStack.length - 1; i > commonLength; --i) {
returnTokens.push({name: typeof previousStack[i] == 'number' ? 'endArray' : 'endObject'});
}
if (commonLength < lastLength) {
if (commonLength < stackLength) {
if (commonLength < previousStack.length) {
if (commonLength < stack.length) {
if (typeof stack[commonLength] == 'string') {
const key = stack[commonLength];
if (this._streamKeys) {
this.push({name: 'startKey'});
this.push({name: 'stringChunk', value: key});
this.push({name: 'endKey'});
if (options?.streamKeys) {
returnTokens.push({name: 'startKey'}, {name: 'stringChunk', value: key}, {name: 'endKey'}, {name: 'keyValue', value: key});
} else {
returnTokens.push({name: 'keyValue', value: key});
}
this.push({name: 'keyValue', value: key});
}
++commonLength;
} else {
this.push({name: typeof last[commonLength] == 'number' ? 'endArray' : 'endObject'});
returnTokens.push({name: typeof previousStack[commonLength] == 'number' ? 'endArray' : 'endObject'});
}
}

// open new objects
for (let i = commonLength; i < stackLength; ++i) {
for (let i = commonLength; i < stack.length; ++i) {
const key = stack[i];
if (typeof key == 'number') {
if (key >= 0) {
this.push({name: 'startArray'});
}
if (key >= 0) returnTokens.push({name: 'startArray'});
} else if (typeof key == 'string') {
this.push({name: 'startObject'});
if (this._streamKeys) {
this.push({name: 'startKey'});
this.push({name: 'stringChunk', value: key});
this.push({name: 'endKey'});
if (options?.streamKeys) {
returnTokens.push({name: 'startObject'}, {name: 'startKey'}, {name: 'stringChunk', value: key}, {name: 'endKey'}, {name: 'keyValue', value: key});
} else {
returnTokens.push({name: 'startObject'}, {name: 'keyValue', value: key});
}
this.push({name: 'keyValue', value: key});
}
}

// update the last stack
this._lastStack = Array.prototype.concat.call(stack);
// add chunk
if (chunk) returnTokens.push(chunk);

return many(returnTokens);
}
}
Filter.filter = Filter.make;
Filter.make.Constructor = Filter;
});

module.exports = filter;
module.exports.filter = filter;

module.exports = Filter;
module.exports.withParser = options => withParser(filter, options);
module.exports.withParserAsStream = options => withParser.asStream(filter, options);
116 changes: 116 additions & 0 deletions tests/xtest-filter.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
'use strict';

import test from 'tape-six';
import chain from 'stream-chain';

import filter from '../src/filters/filter.js';

import readString from './read-string.mjs';

test.asPromise('filter', (t, resolve, reject) => {
const input = '{"a": 1, "b": true, "c": ["d"]}',
pipeline = chain([readString(input), filter.withParser({packKeys: true, packValues: false, filter: /^(a|c)$/})]),
result = [];

pipeline.on('data', chunk => result.push(chunk));
pipeline.on('error', reject);
pipeline.on('end', () => {
t.deepEqual(result, [
{name: 'startObject'},
{name: 'startKey'},
{name: 'stringChunk', value: 'a'},
{name: 'endKey'},
{name: 'keyValue', value: 'a'},
{name: 'startNumber'},
{name: 'numberChunk', value: '1'},
{name: 'endNumber'},
{name: 'startKey'},
{name: 'stringChunk', value: 'c'},
{name: 'endKey'},
{name: 'keyValue', value: 'c'},
{name: 'startArray'},
{name: 'endArray'},
{name: 'endObject'}
]);
resolve();
});
});

/*
unit.add(module, [
function test_filter_no_streaming(t) {
const async = t.startAsync('test_filter_no_streaming');
const input = '{"a": 1, "b": true, "c": ["d"]}',
pipeline = chain([
readString(input),
parser({packKeys: true, packValues: false, streamValues: false}),
filter({filter: /^(|a|c)$/, streamValues: false})
]),
result = [];
pipeline.on('data', chunk => result.push({name: chunk.name, val: chunk.value}));
pipeline.on('end', () => {
eval(t.ASSERT('result.length === 9'));
eval(t.TEST("result[0].name === 'startObject'"));
eval(t.TEST("result[1].name === 'keyValue' && result[1].val === 'a'"));
eval(t.TEST("result[2].name === 'startNumber'"));
eval(t.TEST("result[3].name === 'numberChunk' && result[3].val === '1'"));
eval(t.TEST("result[4].name === 'endNumber'"));
eval(t.TEST("result[5].name === 'keyValue' && result[5].val === 'c'"));
eval(t.TEST("result[6].name === 'startArray'"));
eval(t.TEST("result[7].name === 'endArray'"));
eval(t.TEST("result[8].name === 'endObject'"));
async.done();
});
},
function test_filter_deep(t) {
const async = t.startAsync('test_filter_deep');
const data = {a: {b: {c: 1}}, b: {b: {c: 2}}, c: {b: {c: 3}}};
const pipeline = chain([readString(JSON.stringify(data)), parser({streamValues: false}), filter({filter: /^(?:a|c)\.b\b/})]);
const asm = Assembler.connectTo(pipeline);
pipeline.on('end', () => {
eval(t.TEST('t.unify(asm.current, {a: {b: {c: 1}}, c: {b: {c: 3}}})'));
async.done();
});
},
function test_filter_array(t) {
const async = t.startAsync('test_filter_array');
const data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const pipeline = chain([
readString(JSON.stringify(data)),
parser(),
filter({
filter: stack => stack.length == 1 && typeof stack[0] == 'number' && stack[0] % 2
})
]);
const asm = Assembler.connectTo(pipeline);
pipeline.on('end', () => {
eval(t.TEST('t.unify(asm.current, [2, 4, 6, 8, 10])'));
async.done();
});
},
function test_filter_bug46(t) {
const async = t.startAsync('test_filter_bug46');
const data = [{data: {a: 1, b: 2}, x: 1}, {data: {a: 3, b: 4}, y: 2}];
const pipeline = chain([readString(JSON.stringify(data)), parser(), filter({filter: /data/})]);
const asm = Assembler.connectTo(pipeline);
pipeline.on('end', () => {
eval(t.TEST('t.unify(asm.current, [{data: {a: 1, b: 2}}, {data: {a: 3, b: 4}}])'));
async.done();
});
}
]);
*/

0 comments on commit 3c82145

Please sign in to comment.