Skip to content

Commit c241b73

Browse files
committed
update the example code to be more reslient to backpressure
1 parent 3b5970c commit c241b73

File tree

1 file changed

+65
-44
lines changed

1 file changed

+65
-44
lines changed

README.md

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -49,71 +49,92 @@ import { BlockingQueue } from 'promise-blocking-queue';
4949
const queue = new BlockingQueue({ concurrency: 2 });
5050
let handled = 0;
5151
let failed = 0;
52+
let awaitDrain: Promise<void> | undefined;
53+
let realCount = 0;
5254

5355
const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });
5456
const jsonReadStream = JSONStream.parse('*');
5557
const jsonWriteStream = JSONStream.stringify();
5658
const writeStream = fs.createWriteStream('./results.json');
5759

58-
const logFailed = () => {
59-
console.log(`failed ${++failed}`);
60+
const addUserToDB = async (user) => {
61+
try {
62+
console.log(`adding ${user.username}`);
63+
// Simulate long running task
64+
await sleep((handled + 1) * 100);
65+
console.log(`added ${user.username} #${++handled}`);
66+
const writePaused = !jsonWriteStream.write(user.username);
67+
if (writePaused && !awaitDrain) {
68+
// Down stream asked to pause the writes for now
69+
awaitDrain = new Promise((resolve) => {
70+
jsonWriteStream.once('drain', resolve);
71+
});
72+
}
73+
} catch (err) {
74+
console.log(`failed ${++failed}`, err);
75+
}
6076
};
6177

62-
const logAddedUser = (username) => () => {
63-
console.log(`added ${username} #${++handled}`);
64-
jsonWriteStream.write(username);
65-
};
66-
67-
const addUserToDB = (user) => {
68-
console.log(`adding ${user.username}`);
69-
// Simulate long running task
70-
return sleep((handled + 1) * 100).then(logAddedUser(user.username));
78+
const handleUser = async (user) => {
79+
// Wait until the down stream is ready to receive more data without increasing the memory footprint
80+
if (awaitDrain) {
81+
await awaitDrain;
82+
awaitDrain = undefined;
83+
}
84+
realCount++;
85+
return queue.enqueue(addUserToDB, user).enqueuePromise;
7186
};
7287

88+
// Do not use async!
7389
const mapper = (user, cb) => {
74-
console.log(`streamed ${user.username}`);
75-
const qResult = queue.enqueue(addUserToDB, user);
76-
qResult.fnPromise.catch(logFailed);
77-
// Continue streaming only after current item handling starts
78-
qResult.enqueuePromise.then(cb, cb);
79-
return false;
90+
console.log(`streamed ${user.username}`);
91+
handleUser(user)
92+
.then(() => {
93+
cb();
94+
});
95+
// Pause the read stream until we are ready to handle more data
96+
return false;
8097
};
8198

82-
// tslint:disable-next-line:no-empty
83-
const noop = () => {};
84-
8599
const onReadEnd = () => {
86-
console.log('done read streaming');
87-
// Wait until all work is done
88-
queue.on('idle', () => {
89-
jsonWriteStream.end();
90-
});
100+
console.log('done read streaming');
101+
// If nothing was written, idle event will not be fired
102+
if (realCount === 0) {
103+
jsonWriteStream.end();
104+
} else {
105+
// Wait until all work is done
106+
queue.on('idle', () => {
107+
jsonWriteStream.end();
108+
});
109+
}
91110
};
92111

93112
const onWriteEnd = () => {
94-
console.log(`done processing - ${handled} handled, ${failed} failed`);
95-
process.exit(0);
113+
console.log(`done processing - ${handled} handled, ${failed} failed`);
114+
process.exit(0);
96115
};
97116

98117
jsonWriteStream
99-
.pipe(writeStream)
100-
.on('error', (err) => {
101-
console.log('error wrtie streaming', err);
102-
process.exit(1);
103-
})
104-
.on('end', onWriteEnd)
105-
.on('finish', onWriteEnd);
118+
.pipe(writeStream)
119+
.on('error', (err) => {
120+
console.log('error wrtie streaming', err);
121+
process.exit(1);
122+
})
123+
.on('end', onWriteEnd)
124+
.on('finish', onWriteEnd);
106125

107126
readStream
108-
.pipe(jsonReadStream)
109-
.pipe(es.map(mapper))
110-
.on('data', noop)
111-
.on('error', (err) => {
112-
console.log('error read streaming', err);
113-
process.exit(1);
114-
})
115-
.on('finish', onReadEnd)
116-
.on('end', onReadEnd);
127+
.pipe(jsonReadStream)
128+
.pipe(es.map(mapper))
129+
.on('data', () => {
130+
// Do nothing
131+
})
132+
.on('error', (err) => {
133+
console.log('error read streaming', err);
134+
process.exit(1);
135+
})
136+
.on('finish', onReadEnd)
137+
.on('end', onReadEnd);
117138
```
118139

119140
If `users.json` is like:
@@ -144,8 +165,8 @@ streamed b
144165
adding b
145166
streamed c // c now waits in line to start and streaming is paused until then
146167
added a #1
147-
streamed d // d only get streamed after c has a spot in the queue
148168
adding c // c only gets handled after a is done
169+
streamed d // d only get streamed after c has a spot in the queue
149170
added b #2
150171
adding d // d only gets handled after b is done
151172
done read streaming

0 commit comments

Comments
 (0)