Skip to content

Commit 0712672

Browse files
authored
fix: add backpressure to tls encryption (#3054)
Use a queueless pushable which will wait for the consumer to consume a buffer before accepting another. Pauses incoming streams until the application has read the data. This should reduce memory usage.
1 parent b2124c2 commit 0712672

File tree

2 files changed

+64
-48
lines changed

2 files changed

+64
-48
lines changed

packages/connection-encrypter-tls/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
"@peculiar/webcrypto": "^1.5.0",
5757
"@peculiar/x509": "^1.12.3",
5858
"asn1js": "^3.0.5",
59-
"it-pushable": "^3.2.3",
59+
"it-queueless-pushable": "^1.0.2",
6060
"it-stream-types": "^2.0.2",
6161
"protons-runtime": "^5.5.0",
6262
"uint8arraylist": "^2.4.8",

packages/connection-encrypter-tls/src/utils.ts

+63-47
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ import * as asn1X509 from '@peculiar/asn1-x509'
77
import { Crypto } from '@peculiar/webcrypto'
88
import * as x509 from '@peculiar/x509'
99
import * as asn1js from 'asn1js'
10-
import { pushable } from 'it-pushable'
10+
import { queuelessPushable } from 'it-queueless-pushable'
1111
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
1212
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
1313
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
1414
import { InvalidCertificateError } from './errors.js'
1515
import { KeyType, PublicKey } from './pb/index.js'
1616
import type { PeerId, PublicKey as Libp2pPublicKey, Logger, PrivateKey } from '@libp2p/interface'
17-
import type { Duplex } from 'it-stream-types'
17+
import type { Pushable } from 'it-queueless-pushable'
18+
import type { Duplex, Source } from 'it-stream-types'
1819
import type { Uint8ArrayList } from 'uint8arraylist'
1920

2021
const crypto = new Crypto()
@@ -185,15 +186,19 @@ function formatAsPem (str: string): string {
185186
}
186187

187188
export function itToStream (conn: Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>>): DuplexStream {
188-
const output = pushable()
189+
const output = queuelessPushable<Uint8Array>()
189190
const iterator = conn.source[Symbol.asyncIterator]() as AsyncGenerator<Uint8Array>
190191

191192
const stream = new DuplexStream({
192193
autoDestroy: false,
193194
allowHalfOpen: true,
194195
write (chunk, encoding, callback) {
195-
output.push(chunk)
196-
callback()
196+
void output.push(chunk)
197+
.then(() => {
198+
callback()
199+
}, err => {
200+
callback(err)
201+
})
197202
},
198203
read () {
199204
iterator.next()
@@ -218,53 +223,64 @@ export function itToStream (conn: Duplex<AsyncGenerator<Uint8Array | Uint8ArrayL
218223
return stream
219224
}
220225

221-
export function streamToIt (stream: DuplexStream): Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> {
222-
const output: Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = {
223-
source: (async function * () {
224-
const output = pushable<Uint8Array>()
225-
226-
stream.addListener('data', (buf) => {
227-
output.push(buf.subarray())
228-
})
229-
// both ends closed
230-
stream.addListener('close', () => {
231-
output.end()
232-
})
233-
stream.addListener('error', (err) => {
234-
output.end(err)
235-
})
236-
// just writable end closed
237-
stream.addListener('finish', () => {
238-
output.end()
239-
})
240-
241-
try {
242-
yield * output
243-
} catch (err: any) {
244-
stream.destroy(err)
245-
throw err
246-
}
247-
})(),
248-
sink: async (source) => {
249-
try {
250-
for await (const buf of source) {
251-
const sendMore = stream.write(buf.subarray())
252-
253-
if (!sendMore) {
254-
await waitForBackpressure(stream)
255-
}
256-
}
226+
class DuplexIterable implements Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> {
227+
source: Pushable<Uint8Array>
228+
private readonly stream: DuplexStream
229+
230+
constructor (stream: DuplexStream) {
231+
this.stream = stream
232+
this.source = queuelessPushable<Uint8Array>()
257233

258-
// close writable end
259-
stream.end()
260-
} catch (err: any) {
261-
stream.destroy(err)
262-
throw err
234+
stream.addListener('data', (buf) => {
235+
stream.pause()
236+
this.source.push(buf.subarray())
237+
.then(() => {
238+
stream.resume()
239+
}, (err) => {
240+
stream.emit('error', err)
241+
})
242+
})
243+
// both ends closed
244+
stream.addListener('close', () => {
245+
this.source.end()
246+
.catch(err => {
247+
stream.emit('error', err)
248+
})
249+
})
250+
stream.addListener('error', (err) => {
251+
this.source.end(err)
252+
.catch(() => {})
253+
})
254+
// just writable end closed
255+
stream.addListener('finish', () => {
256+
this.source.end()
257+
.catch(() => {})
258+
})
259+
260+
this.sink = this.sink.bind(this)
261+
}
262+
263+
async sink (source: Source<Uint8Array | Uint8ArrayList>): Promise<void> {
264+
try {
265+
for await (const buf of source) {
266+
const sendMore = this.stream.write(buf.subarray())
267+
268+
if (!sendMore) {
269+
await waitForBackpressure(this.stream)
270+
}
263271
}
272+
273+
// close writable end
274+
this.stream.end()
275+
} catch (err: any) {
276+
this.stream.destroy(err)
277+
throw err
264278
}
265279
}
280+
}
266281

267-
return output
282+
export function streamToIt (stream: DuplexStream): Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> {
283+
return new DuplexIterable(stream)
268284
}
269285

270286
async function waitForBackpressure (stream: DuplexStream): Promise<void> {

0 commit comments

Comments
 (0)