Skip to content

Commit 0d06b45

Browse files
committed
update the example code to be more resilient to backpressure
1 parent c241b73 commit 0d06b45

File tree

1 file changed

+1
-3
lines changed

1 file changed

+1
-3
lines changed

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ const queue = new BlockingQueue({ concurrency: 2 });
5050
let handled = 0;
5151
let failed = 0;
5252
let awaitDrain: Promise<void> | undefined;
53-
let realCount = 0;
5453

5554
const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });
5655
const jsonReadStream = JSONStream.parse('*');
@@ -81,7 +80,6 @@ const handleUser = async (user) => {
8180
await awaitDrain;
8281
awaitDrain = undefined;
8382
}
84-
realCount++;
8583
return queue.enqueue(addUserToDB, user).enqueuePromise;
8684
};
8785

@@ -99,7 +97,7 @@ const mapper = (user, cb) => {
9997
const onReadEnd = () => {
10098
console.log('done read streaming');
10199
// If nothing was written, idle event will not be fired
102-
if (realCount === 0) {
100+
if (queue.pendingCount === 0 && queue.activeCount === 0) {
103101
jsonWriteStream.end();
104102
} else {
105103
// Wait until all work is done

0 commit comments

Comments
 (0)