Skip to content

Commit

Permalink
fix: Fix stream filter and empty batch handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
jheer committed Dec 13, 2024
1 parent f39b57b commit afc72f0
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/format/parse-json.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ function parseNDJSON(input, {

async function readNDJSON(stream, names) {
const iter = stream[Symbol.asyncIterator]();
const first = (await iter.next()).value;
let first;
do { first = (await iter.next()).value; } while (first.length === 0);

names ??= Object.keys(firstNonNull(first));
const cols = names.map(() => []);
Expand Down
4 changes: 2 additions & 2 deletions src/format/stream/line-filter-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ export class LineFilterStream extends TransformStream {
const n = chunk.length;
const bits = new BitSet(n);
for (let c = 0; c < chunk.length; ++c, ++i) {
if (drop(chunk[c], i)) bits.set(i);
if (drop(chunk[c], i)) bits.set(c);
}
controller.enqueue(
bits.count()
? chunk.filter((_, i) => !bits.get(i))
? chunk.filter((_, c) => !bits.get(c))
: chunk
);
}
Expand Down
3 changes: 2 additions & 1 deletion src/format/stream/parse-text-rows.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ function defaultNames(n, off = 0) {
*/
export async function parseTextRows(stream, options) {
const iter = stream[Symbol.asyncIterator]();
let batch = (await iter.next()).value;
let batch;
do { batch = (await iter.next()).value; } while (batch.length === 0);

const n = batch[0].length;
const automax = +options.autoMax || 1000;
Expand Down
34 changes: 34 additions & 0 deletions test/format/parse-csv-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,38 @@ describe('parseCSV', () => {
'csv parsed data with autoType false'
);
});

it('parses streaming data', async () => {
/** @type {ReadableStream<string>} */
const stream = new ReadableStream({
start(controller) {
controller.enqueue('skip line 1\n');
controller.enqueue('skip line 2\n');
controller.enqueue('a,b,c\n');
controller.enqueue('foo,"ba');
controller.enqueue('r",3.0');
controller.enqueue('\nbaz,"bop"');
controller.enqueue(',2.0\r');
controller.enqueue('\nque,"""rocky"');
controller.enqueue('" road",1.0');
controller.enqueue('\n');
controller.enqueue('# commented line\n');
controller.enqueue('who,"goes"');
controller.enqueue(',0.5');
controller.close();
},
pull() { },
cancel() { }
});

tableEqual(
await parseCSV(stream, { skip: 2, comment: '#' }),
{
a: ['foo', 'baz', 'que', 'who'],
b: ['bar', 'bop', '"rocky" road', 'goes'],
c: [3.0, 2.0, 1.0, 0.5]
},
'csv parsed from custom stream'
);
});
});

0 comments on commit afc72f0

Please sign in to comment.