Skip to content

Commit

Permalink
feat: use worker thread for piece hashing
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed Jun 24, 2024
1 parent 2a60b1b commit 5ce8608
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 21 deletions.
18 changes: 2 additions & 16 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import ora from 'ora'
import { CID } from 'multiformats/cid'
import { base64 } from 'multiformats/bases/base64'
import { identity } from 'multiformats/hashes/identity'
import * as Digest from 'multiformats/hashes/digest'
import * as DID from '@ipld/dag-ucan/did'
import * as dagJSON from '@ipld/dag-json'
import { CarWriter } from '@ipld/car'
import { filesFromPaths } from 'files-from-path'
import * as PieceHasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Account from './account.js'

import { spaceAccess } from '@web3-storage/w3up-client/capability/access'
Expand All @@ -26,6 +24,7 @@ import {
readProofFromBytes,
uploadListResponseToString,
startOfLastMonth,
pieceHasher,
} from './lib.js'
import * as ucanto from '@ucanto/core'
import { ed25519 } from '@ucanto/principal'
Expand Down Expand Up @@ -160,20 +159,7 @@ export async function upload(firstPath, opts) {
: `Storing ${Math.min(Math.round((totalSent / totalSize) * 100), 100)}%`

const root = await uploadFn({
pieceHasher: {
code: PieceHasher.code,
name: 'fr32-sha2-256-trunc254-padded-binary-tree-multihash',
async digest (input) {
const hasher = PieceHasher.create()
hasher.write(input)

const bytes = new Uint8Array(hasher.multihashByteLength())
hasher.digestInto(bytes, 0, true)
hasher.free()

return Digest.decode(bytes)
}
},
pieceHasher,
onShardStored: ({ cid, size, piece }) => {
totalSent += size
if (opts?.verbose) {
Expand Down
37 changes: 32 additions & 5 deletions lib.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import fs from 'fs'
import path from 'path'
import fs from 'node:fs'
import path from 'node:path'
import { Worker } from 'node:worker_threads'
import { fileURLToPath } from 'node:url'
// @ts-expect-error no typings :(
import tree from 'pretty-tree'
import { importDAG } from '@ucanto/core/delegation'
Expand Down Expand Up @@ -28,9 +30,9 @@ import chalk from 'chalk'
* @typedef {import('@web3-storage/capabilities/types').FilecoinInfoSuccess} FilecoinInfoSuccess
*/

/**
*
*/
const __filename = fileURLToPath(import.meta.url)
const __dirname = path.dirname(__filename)

export function getPkg() {

Check warning on line 36 in lib.js

View workflow job for this annotation

GitHub Actions / Test

Missing JSDoc comment
// @ts-ignore JSON.parse works with Buffer in Node.js
return JSON.parse(fs.readFileSync(new URL('./package.json', import.meta.url)))
Expand Down Expand Up @@ -347,3 +349,28 @@ export const streamToBlob = async source => {
}))
return new Blob(chunks)
}

const workerPath = path.join(__dirname, 'piece-hasher-worker.js')

/** @see https://github.com/multiformats/multicodec/pull/331/files */
const pieceHasherCode = 0x1011

/** @type {import('multiformats').MultihashHasher<typeof pieceHasherCode>} */
export const pieceHasher = {
code: pieceHasherCode,
name: 'fr32-sha2-256-trunc254-padded-binary-tree',
async digest (input) {
const bytes = await new Promise((resolve, reject) => {
const worker = new Worker(workerPath, { workerData: input })
worker.on('message', resolve)
worker.on('error', reject)
worker.on('exit', (code) => {
if (code !== 0) reject(new Error(`Piece hasher worker exited with code: ${code}`))
})
})
const digest =
/** @type {import('multiformats').MultihashDigest<typeof pieceHasherCode>} */
(Digest.decode(bytes))
return digest
}
}
11 changes: 11 additions & 0 deletions piece-hasher-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { parentPort, workerData } from 'node:worker_threads'
import * as PieceHasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'

const hasher = PieceHasher.create()
hasher.write(workerData)

const bytes = new Uint8Array(hasher.multihashByteLength())
hasher.digestInto(bytes, 0, true)
hasher.free()

parentPort?.postMessage(bytes)

0 comments on commit 5ce8608

Please sign in to comment.