This repository was archived by the owner on Jan 6, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy patharchiver.js
More file actions
323 lines (285 loc) · 9.94 KB
/
archiver.js
File metadata and controls
323 lines (285 loc) · 9.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
var path = require('path')
var promisify = require('es6-promisify')
var hyperdrive = require('hyperdrive')
var datEncoding = require('dat-encoding')
var discoverySwarm = require('discovery-swarm')
var swarmDefaults = require('datland-swarm-defaults')
var mkdirp = require('mkdirp')
var rimraf = require('rimraf')
var getFolderSize = require('get-folder-size')
var ms = require('ms')
var pda = require('pauls-dat-api')
var lock = require('./lock')
var debug = require('debug')('archiver')
var debugJobs = require('debug')('jobs')
mkdirp = promisify(mkdirp)
rimraf = promisify(rimraf)
getFolderSize = promisify(getFolderSize)
// exported api
// =
module.exports = class Archiver {
constructor (cloud) {
this.cloud = cloud
this.config = cloud.config
this.archives = {}
this.loadPromises = {}
// periodically construct the indexes
this.indexes = {popular: []}
this._startJob(this.computePopularIndex, 'popularArchivesIndex')
this._startJob(this.computeUserDiskUsageAndSwarm, 'userDiskUsage')
this._startJob(this.deleteDeadArchives, 'deleteDeadArchives')
}
// methods
// =
getArchive (key) {
return this.archives[key]
}
isLoadingArchive (key) {
return key in this.loadPromises
}
// load archive (wrapper) manages load promises
async loadArchive (key) {
key = datEncoding.toStr(key)
// fallback to archive if it exists
if (key in this.archives) {
return this.archives[key]
}
// fallback to the promise, if it exists
if (key in this.loadPromises) {
return this.loadPromises[key]
}
// ensure the folder exists
var archivePath = this._getArchiveFilesPath(key)
await mkdirp(archivePath)
// run and cache the promise
var p = this._loadArchiveInner(archivePath, key)
this.loadPromises[key] = p
// when done, clear the promise
const clear = () => delete this.loadPromises[key]
p.then(clear, clear)
// when done, save the archive instance
p.then(archive => { this.archives[key] = archive })
return p
}
async closeArchive (key) {
key = datEncoding.toStr(key)
var archive = this.archives[key]
if (archive) {
this._swarm(archive, {download: false, upload: false})
await new Promise(resolve => archive.close(resolve))
delete this.archives[key]
} else {
// is archive still loading?
// wait to finish then try to close
if (this.isLoadingArchive(key)) {
await this.loadPromises[key]
return this.closeArchive(key)
}
}
}
async closeAllArchives () {
return Promise.all(Object.keys(this.archives).map(key =>
this.closeArchive(key)
))
}
// helper only reads manifest from disk if DNE or changed
async getManifest (key) {
var archive = this.archives[datEncoding.toStr(key)]
if (!archive) {
return null
}
try {
var st = await pda.stat(archive, '/dat.json')
if (archive.manifest) {
if (st.offset === archive.manifest._offset) {
// use cached
return archive.manifest
}
}
archive.manifest = await pda.readManifest(archive)
archive.manifest._offset = st.offset
return archive.manifest
} catch (e) {
if (!e.notFound) {
console.error('Failed to load manifest for', archive.key, e)
}
return null
}
}
async computePopularIndex () {
var release = await lock('archiver-job')
try {
debugJobs('START Compute popular archives index')
var start = Date.now()
var popular = Object.keys(this.archives)
popular.sort((aKey, bKey) => (
this.archives[bKey].numPeers - this.archives[aKey].numPeers
))
this.indexes.popular = popular.slice(0, 100).map(key => (
{key, numPeers: this.archives[key].numPeers}
))
} catch (e) {
console.error(e)
debugJobs('FAILED Compute popular archives index (%dms)', (Date.now() - start))
} finally {
debugJobs('FINISH Compute popular archives index (%dms)', (Date.now() - start))
release()
}
}
async computeUserDiskUsageAndSwarm () {
var release = await lock('archiver-job')
try {
debugJobs('START Compute user quota usage')
var start = Date.now()
var users = await this.cloud.usersDB.list()
await Promise.all(users.map(async (userRecord) => {
// sum the disk usage of each archive
var diskUsage = 0
await Promise.all(userRecord.archives.map(async (archiveRecord) => {
var path = this._getArchiveFilesPath(archiveRecord.key)
var archive = this.getArchive(archiveRecord.key)
var archiveUsage = await getFolderSize(path)
if (archive) archive.diskUsage = archiveUsage
diskUsage += archiveUsage
}))
// store on the user record
userRecord.diskUsage = diskUsage
await this.cloud.usersDB.update(userRecord.id, {diskUsage})
// reconfigure swarms based on quota overages
var quotaPct = this.config.getUserDiskQuotaPct(userRecord)
userRecord.archives.forEach(archiveRecord => {
this._swarm(archiveRecord.key, {
upload: true, // always upload
download: quotaPct < 1 // only download if the user has capacity
})
})
}))
} catch (e) {
console.error(e)
debugJobs('FAILED Compute user quota usage (%dms)', (Date.now() - start))
} finally {
debugJobs('FINISH Compute user quota usage (%dms)', (Date.now() - start))
release()
}
}
async deleteDeadArchives () {
var release = await lock('archiver-job')
try {
debugJobs('START Delete dead archives')
var start = Date.now()
var deadArchiveKeys = await this.cloud.archivesDB.listDeadArchiveKeys()
await Promise.all(deadArchiveKeys.map(async (archiveKey) => {
// delete files
var archivePath = this._getArchiveFilesPath(archiveKey)
await rimraf(archivePath, {disableGlob: true})
}))
} catch (e) {
console.error(e)
debugJobs('FAILED Delete dead archives (%dms)', (Date.now() - start))
} finally {
debugJobs('FINISH Delete dead archives (%dms)', (Date.now() - start))
release()
}
}
// internal
// =
_getArchiveFilesPath (key) {
return path.join(this.config.dir, 'archives', key.slice(0, 2), key.slice(2))
}
_startJob (method, configKey) {
var i = setInterval(method.bind(this), ms(this.config.jobs[configKey]))
i.unref()
}
// load archive (inner) main load logic
async _loadArchiveInner (archivePath, key) {
// create the archive instance
var archive = hyperdrive(archivePath, key, {sparse: false})
archive.replicationStreams = [] // list of all active replication streams
archive.numPeers = 1 // num of active peers (1 for self)
archive.manifest = null // cached manifest
archive.diskUsage = 0 // cached disk usage
// wait for ready
await new Promise((resolve, reject) => {
archive.ready(err => {
if (err) reject(err)
else resolve()
})
})
return archive
}
// swarm archive
_swarm (archive, opts) {
if (typeof archive === 'string') {
archive = this.getArchive(archive)
if (!archive) return
}
// are any opts changed?
var so = archive.swarmOpts
if (so && so.download === opts.download && so.upload === opts.upload) {
return
}
// close existing swarm
if (archive.swarm) {
archive.replicationStreams.forEach(stream => stream.destroy()) // stop all active replications
archive.replicationStreams.length = 0
archive.swarm.destroy()
archive.swarm = null
}
// done?
if (opts.download === false && opts.upload === false) {
return
}
// join the swarm
var swarm = discoverySwarm(swarmDefaults({
hash: false,
utp: true,
tcp: true,
stream: (info) => {
var key = datEncoding.toStr(archive.key)
var dkey = datEncoding.toStr(archive.discoveryKey)
var chan = dkey.slice(0, 6) + '..' + dkey.slice(-2)
var keyStrShort = key.slice(0, 6) + '..' + key.slice(-2)
debug('new connection chan=%s type=%s host=%s key=%s', chan, info.type, info.host, keyStrShort)
// create the replication stream
var stream = archive.replicate({
download: opts.download,
upload: opts.upload,
live: true
})
stream.isActivePeer = false
archive.replicationStreams.push(stream)
stream.once('close', () => {
var rs = archive.replicationStreams
var i = rs.indexOf(stream)
if (i !== -1) rs.splice(rs.indexOf(stream), 1)
archive.numPeers = countActivePeers(archive)
})
// timeout the connection after 5s if handshake does not occur
var TO = setTimeout(() => {
debug('handshake timeout chan=%s type=%s host=%s key=%s', chan, info.type, info.host, keyStrShort)
stream.destroy(new Error('Timed out waiting for handshake'))
}, 5000)
stream.once('handshake', () => {
stream.isActivePeer = true
archive.numPeers = countActivePeers(archive)
clearTimeout(TO)
})
// debugging
stream.on('error', err => debug('error chan=%s type=%s host=%s key=%s', chan, info.type, info.host, keyStrShort, err))
stream.on('close', () => debug('closing connection chan=%s type=%s host=%s key=%s', chan, info.type, info.host, keyStrShort))
return stream
}
}))
swarm.listen(this.config.datPort)
swarm.on('error', err => debug('Swarm error for', datEncoding.toStr(archive.key), err))
swarm.join(archive.discoveryKey, { announce: !(opts.upload === false) })
debug('Swarming archive', datEncoding.toStr(archive.key), 'discovery key', datEncoding.toStr(archive.discoveryKey))
archive.swarm = swarm
archive.swarmOpts = opts
}
}
function countActivePeers (archive) {
return archive.replicationStreams.reduce((acc, stream) => (
acc + (stream.isActivePeer ? 1 : 0)
), 1) // start from one to include self
}