Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 373f69e

Browse files
committed
fix(gateway): streaming compressed payload
This change simplifies code responsible for streaming response and makes the streaming actually work by telling the payload compression stream to flush its content on every read(). (previous version was buffering entire thing in Hapi's compressor memory) We also do content-type detection based on the beginning of the stream by peeking at first `fileType.minimumBytes` bytes. License: MIT Signed-off-by: Marcin Rataj <[email protected]>
1 parent dd817ad commit 373f69e

File tree

3 files changed

+57
-49
lines changed

3 files changed

+57
-49
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
"bl": "^3.0.0",
8989
"boom": "^7.2.0",
9090
"bs58": "^4.0.1",
91+
"buffer-peek-stream": "^1.0.1",
9192
"byteman": "^1.3.5",
9293
"cid-tool": "~0.2.0",
9394
"cids": "~0.5.8",

src/http/gateway/resources/gateway.js

+50-49
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
const debug = require('debug')
44
const log = debug('ipfs:http-gateway')
55
log.error = debug('ipfs:http-gateway:error')
6-
const pull = require('pull-stream')
7-
const pushable = require('pull-pushable')
8-
const toStream = require('pull-stream-to-stream')
6+
97
const fileType = require('file-type')
108
const mime = require('mime-types')
119
const { PassThrough } = require('readable-stream')
1210
const Boom = require('boom')
11+
const peek = require('buffer-peek-stream')
1312

1413
const { resolver } = require('ipfs-http-response')
1514
const PathUtils = require('../utils/path')
@@ -30,6 +29,20 @@ function detectContentType (ref, chunk) {
3029
return mime.contentType(mimeType)
3130
}
3231

32+
// Enable streaming of compressed payload
33+
// https://github.com/hapijs/hapi/issues/3599
34+
class ResponseStream extends PassThrough {
35+
_read (size) {
36+
super._read(size)
37+
if (this._compressor) {
38+
this._compressor.flush()
39+
}
40+
}
41+
setCompressor (compressor) {
42+
this._compressor = compressor
43+
}
44+
}
45+
3346
module.exports = {
3447
checkCID (request, h) {
3548
if (!request.params.cid) {
@@ -85,58 +98,46 @@ module.exports = {
8598
return h.redirect(PathUtils.removeTrailingSlash(ref)).permanent(true)
8699
}
87100

88-
return new Promise((resolve, reject) => {
89-
let pusher
90-
let started = false
91-
92-
pull(
93-
ipfs.catPullStream(data.cid),
94-
pull.drain(
95-
chunk => {
96-
if (!started) {
97-
started = true
98-
pusher = pushable()
99-
const res = h.response(toStream.source(pusher).pipe(new PassThrough()))
100-
101-
// Etag maps directly to an identifier for a specific version of a resource
102-
res.header('Etag', `"${data.cid}"`)
101+
const rawStream = ipfs.catReadableStream(data.cid)
102+
const responseStream = new ResponseStream()
103+
104+
// Pass-through Content-Type sniffing over initial bytes
105+
const contentType = await new Promise((resolve, reject) => {
106+
try {
107+
const peekBytes = fileType.minimumBytes
108+
peek(rawStream, peekBytes, (err, streamHead, outputStream) => {
109+
if (err) {
110+
log.error(err)
111+
return reject(err)
112+
}
113+
outputStream.pipe(responseStream)
114+
resolve(detectContentType(ref, streamHead))
115+
})
116+
} catch (err) {
117+
log.error(err)
118+
reject(err)
119+
}
120+
})
103121

104-
// Set headers specific to the immutable namespace
105-
if (ref.startsWith('/ipfs/')) {
106-
res.header('Cache-Control', 'public, max-age=29030400, immutable')
107-
}
122+
const res = h.response(responseStream)
108123

109-
const contentType = detectContentType(ref, chunk)
124+
// Etag maps directly to an identifier for a specific version of a resource
125+
res.header('Etag', `"${data.cid}"`)
110126

111-
log('ref ', ref)
112-
log('mime-type ', contentType)
127+
// Set headers specific to the immutable namespace
128+
if (ref.startsWith('/ipfs/')) {
129+
res.header('Cache-Control', 'public, max-age=29030400, immutable')
130+
}
113131

114-
if (contentType) {
115-
log('writing content-type header')
116-
res.header('Content-Type', contentType)
117-
}
132+
log('ref ', ref)
133+
log('content-type ', contentType)
118134

119-
resolve(res)
120-
}
121-
pusher.push(chunk)
122-
},
123-
err => {
124-
if (err) {
125-
log.error(err)
126-
127-
// We already started flowing, abort the stream
128-
if (started) {
129-
return pusher.end(err)
130-
}
131-
132-
return reject(err)
133-
}
135+
if (contentType) {
136+
log('writing content-type header')
137+
res.header('Content-Type', contentType)
138+
}
134139

135-
pusher.end()
136-
}
137-
)
138-
)
139-
})
140+
return res
140141
},
141142

142143
afterHandler (request, h) {

test/gateway/index.js

+6
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,15 @@ describe('HTTP Gateway', function () {
156156

157157
expect(res.statusCode).to.equal(200)
158158
expect(res.rawPayload).to.eql(bigFile)
159+
expect(res.headers['x-ipfs-path']).to.equal(`/ipfs/${bigFileHash}`)
160+
expect(res.headers['etag']).to.equal(`"${bigFileHash}"`)
161+
expect(res.headers['cache-control']).to.equal('public, max-age=29030400, immutable')
162+
expect(res.headers['content-type']).to.equal('application/octet-stream')
159163
})
160164

161165
it('load a jpg file', async () => {
162166
const kitty = 'QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg'
167+
const kittyDirectCid = 'Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u'
163168

164169
const res = await gateway.inject({
165170
method: 'GET',
@@ -169,6 +174,7 @@ describe('HTTP Gateway', function () {
169174
expect(res.statusCode).to.equal(200)
170175
expect(res.headers['content-type']).to.equal('image/jpeg')
171176
expect(res.headers['x-ipfs-path']).to.equal('/ipfs/' + kitty)
177+
expect(res.headers['etag']).to.equal(`"${kittyDirectCid}"`)
172178
expect(res.headers['cache-control']).to.equal('public, max-age=29030400, immutable')
173179
expect(res.headers.etag).to.equal('"Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u"')
174180
expect(res.headers.suborigin).to.equal('ipfs000bafybeidsg6t7ici2osxjkukisd5inixiunqdpq2q5jy4a2ruzdf6ewsqk4')

0 commit comments

Comments
 (0)