From cae1ce6b210b6e5fe2e70e7816c8c5da4a5d6403 Mon Sep 17 00:00:00 2001 From: Carlos Lostao Date: Fri, 20 Dec 2024 13:49:02 +0100 Subject: [PATCH] update: remove observable interface --- packages/auto-drive/package.json | 1 - packages/auto-drive/src/api/wrappers.ts | 225 ++++++++++---------- packages/auto-drive/src/fs/wrappers.ts | 83 ++++---- packages/auto-drive/src/utils/index.ts | 1 - packages/auto-drive/src/utils/observable.ts | 19 -- packages/auto-drive/tsconfig.json | 4 +- yarn.lock | 5 +- 7 files changed, 162 insertions(+), 176 deletions(-) delete mode 100644 packages/auto-drive/src/utils/observable.ts diff --git a/packages/auto-drive/package.json b/packages/auto-drive/package.json index fe254c56..58e771e8 100644 --- a/packages/auto-drive/package.json +++ b/packages/auto-drive/package.json @@ -46,7 +46,6 @@ "jszip": "^3.10.1", "mime-types": "^2.1.35", "process": "^0.11.10", - "rxjs": "^7.8.1", "stream": "^0.0.3", "zod": "^3.23.8" }, diff --git a/packages/auto-drive/src/api/wrappers.ts b/packages/auto-drive/src/api/wrappers.ts index 043f58a0..9516eca2 100644 --- a/packages/auto-drive/src/api/wrappers.ts +++ b/packages/auto-drive/src/api/wrappers.ts @@ -1,16 +1,15 @@ import mime from 'mime-types' import { asyncByChunk, asyncFromStream, fileToIterable } from '../utils/async' import { progressToPercentage } from '../utils/misc' -import { PromisedObservable } from '../utils/observable' import { apiCalls } from './calls/index' import { AutoDriveApi } from './connection' import { GenericFile, GenericFileWithinFolder } from './models/file' import { constructFromInput, constructZipBlobFromTreeAndPaths } from './models/folderTree' -import { UploadChunksStatus, UploadFileStatus, UploadFolderStatus } from './models/uploads' export type UploadFileOptions = { password?: string compression?: boolean + onProgress?: (progress: number) => void } const UPLOAD_FILE_CHUNK_SIZE = 1024 * 1024 @@ -20,17 +19,23 @@ const uploadFileChunks = ( fileUploadId: string, asyncIterable: AsyncIterable, uploadChunkSize: number = UPLOAD_FILE_CHUNK_SIZE, -): PromisedObservable => { - return new PromisedObservable(async (subscriber) => { - let index = 0 - let uploadBytes = 0 - for await (const chunk of asyncByChunk(asyncIterable, uploadChunkSize)) { - await apiCalls.uploadFileChunk(api, { uploadId: fileUploadId, chunk, index }) - uploadBytes += chunk.length - subscriber.next({ uploadBytes }) - index++ + onProgress?: (uploadedBytes: number) => void, +): Promise => { + return new Promise(async (resolve, reject) => { + try { + let index = 0 + let uploadBytes = 0 + for await (const chunk of asyncByChunk(asyncIterable, uploadChunkSize)) { + await apiCalls.uploadFileChunk(api, { uploadId: fileUploadId, chunk, index }) + uploadBytes += chunk.length + onProgress?.(uploadBytes) + index++ + } + + resolve() + } catch (e) { + reject(e) } - subscriber.complete() }) } @@ -56,11 +61,12 @@ export const uploadFileFromInput = ( file: File, options: UploadFileOptions = {}, uploadChunkSize?: number, -): PromisedObservable => { +): Promise => { const { password = undefined, compression = true } = options - return new PromisedObservable(async (subscriber) => { - const { stringToCid, compressFile, CompressionAlgorithm, encryptFile, EncryptionAlgorithm } = - await import('@autonomys/auto-dag-data') + return new Promise(async (resolve, reject) => { + const { compressFile, CompressionAlgorithm, encryptFile, EncryptionAlgorithm } = await import( + '@autonomys/auto-dag-data' + ) let asyncIterable: AsyncIterable = fileToIterable(file) if (compression) { @@ -95,14 +101,13 @@ export const uploadFileFromInput = ( uploadOptions, }) - await uploadFileChunks(api, fileUpload.id, asyncIterable, uploadChunkSize).forEach((e) => - subscriber.next({ type: 'file', progress: progressToPercentage(e.uploadBytes, file.size) }), - ) + await uploadFileChunks(api, fileUpload.id, asyncIterable, uploadChunkSize, (bytes) => { + options.onProgress?.(progressToPercentage(bytes, file.size)) + }) const result = await apiCalls.completeUpload(api, { uploadId: fileUpload.id }) - subscriber.next({ type: 'file', progress: 100, cid: result.cid }) - subscriber.complete() + resolve(result.cid) }) } @@ -123,60 +128,58 @@ export const uploadFileFromInput = ( * @returns {PromisedObservable} - An observable that emits the upload status. * @throws {Error} - Throws an error if the upload fails at any stage. */ -export const uploadFile = ( +export const uploadFile = async ( api: AutoDriveApi, file: GenericFile, options: UploadFileOptions = {}, uploadChunkSize?: number, -): PromisedObservable => { +): Promise => { const { password = undefined, compression = true } = options - return new PromisedObservable(async (subscriber) => { - const { stringToCid, compressFile, CompressionAlgorithm, encryptFile, EncryptionAlgorithm } = - await import('@autonomys/auto-dag-data') - let asyncIterable: AsyncIterable = file.read() - - if (compression) { - asyncIterable = compressFile(asyncIterable, { - level: 9, - algorithm: CompressionAlgorithm.ZLIB, - }) - } - - if (password) { - asyncIterable = encryptFile(asyncIterable, password, { - algorithm: EncryptionAlgorithm.AES_256_GCM, - }) - } + const { compressFile, CompressionAlgorithm, encryptFile, EncryptionAlgorithm } = await import( + '@autonomys/auto-dag-data' + ) + let asyncIterable: AsyncIterable = file.read() - const uploadOptions = { - compression: compression - ? { - level: 9, - algorithm: CompressionAlgorithm.ZLIB, - } - : undefined, - encryption: password - ? { - algorithm: EncryptionAlgorithm.AES_256_GCM, - } - : undefined, - } - const fileUpload = await apiCalls.createFileUpload(api, { - mimeType: mime.lookup(file.name) || undefined, - filename: file.name, - uploadOptions, + if (compression) { + asyncIterable = compressFile(asyncIterable, { + level: 9, + algorithm: CompressionAlgorithm.ZLIB, }) + } - await uploadFileChunks(api, fileUpload.id, asyncIterable, uploadChunkSize).forEach((e) => - subscriber.next({ type: 'file', progress: progressToPercentage(e.uploadBytes, file.size) }), - ) + if (password) { + asyncIterable = encryptFile(asyncIterable, password, { + algorithm: EncryptionAlgorithm.AES_256_GCM, + }) + } - const result = await apiCalls.completeUpload(api, { uploadId: fileUpload.id }) + const uploadOptions = { + compression: compression + ? { + level: 9, + algorithm: CompressionAlgorithm.ZLIB, + } + : undefined, + encryption: password + ? { + algorithm: EncryptionAlgorithm.AES_256_GCM, + } + : undefined, + } + const fileUpload = await apiCalls.createFileUpload(api, { + mimeType: mime.lookup(file.name) || undefined, + filename: file.name, + uploadOptions, + }) - subscriber.next({ type: 'file', progress: 100, cid: result.cid }) - subscriber.complete() + await uploadFileChunks(api, fileUpload.id, asyncIterable, uploadChunkSize, (bytes) => { + options.onProgress?.(progressToPercentage(bytes, file.size)) }) + + const result = await apiCalls.completeUpload(api, { uploadId: fileUpload.id }) + + return result.cid } /** @@ -199,8 +202,12 @@ export const uploadFile = ( export const uploadFolderFromInput = async ( api: AutoDriveApi, fileList: FileList | File[], - { uploadChunkSize, password }: { uploadChunkSize?: number; password?: string } = {}, -): Promise> => { + { + uploadChunkSize, + password, + onProgress, + }: { uploadChunkSize?: number; password?: string; onProgress?: (progress: number) => void } = {}, +): Promise => { const files = fileList instanceof FileList ? Array.from(fileList) : fileList const fileTree = constructFromInput(files) @@ -223,45 +230,42 @@ export const uploadFolderFromInput = async ( { password, compression: true, + onProgress, }, ) } - return new PromisedObservable(async (subscriber) => { - // Otherwise, we upload the files as a folder w/o compression or encryption - const folderUpload = await apiCalls.createFolderUpload(api, { - fileTree, - }) + // Otherwise, we upload the files as a folder w/o compression or encryption + const folderUpload = await apiCalls.createFolderUpload(api, { + fileTree, + }) - let currentBytesUploaded = 0 - const totalSize = files.reduce((acc, file) => acc + file.size, 0) - for (const file of files) { - await uploadFileWithinFolderUpload( - api, - folderUpload.id, - { - read: () => fileToIterable(file), - name: file.name, - mimeType: mime.lookup(file.name) || undefined, - size: file.size, - path: file.webkitRelativePath, + let currentBytesUploaded = 0 + const totalSize = files.reduce((acc, file) => acc + file.size, 0) + for (const file of files) { + await uploadFileWithinFolderUpload( + api, + folderUpload.id, + { + read: () => fileToIterable(file), + name: file.name, + mimeType: mime.lookup(file.name) || undefined, + size: file.size, + path: file.webkitRelativePath, + }, + uploadChunkSize, + { + onProgress: (progress) => { + onProgress?.(progressToPercentage(currentBytesUploaded + progress, totalSize)) }, - uploadChunkSize, - ).forEach((e) => { - subscriber.next({ - type: 'folder', - progress: progressToPercentage(currentBytesUploaded + e.uploadBytes, totalSize), - }) - }) - - currentBytesUploaded += file.size - } + }, + ) + currentBytesUploaded += file.size + } - const result = await apiCalls.completeUpload(api, { uploadId: folderUpload.id }) + const result = await apiCalls.completeUpload(api, { uploadId: folderUpload.id }) - subscriber.next({ type: 'folder', progress: 100, cid: result.cid }) - subscriber.complete() - }) + return result.cid } /** @@ -273,29 +277,26 @@ export const uploadFolderFromInput = async ( * * @returns {Promise} A promise that resolves when the file upload is complete. */ -export const uploadFileWithinFolderUpload = ( +export const uploadFileWithinFolderUpload = async ( api: AutoDriveApi, uploadId: string, file: GenericFileWithinFolder, uploadChunkSize?: number, -): PromisedObservable => { - return new PromisedObservable(async (subscriber) => { - const fileUpload = await apiCalls.createFileUploadWithinFolderUpload(api, { - uploadId, - name: file.name, - mimeType: file.mimeType, - relativeId: file.path, - uploadOptions: {}, - }) + options: Pick = {}, +): Promise => { + const fileUpload = await apiCalls.createFileUploadWithinFolderUpload(api, { + uploadId, + name: file.name, + mimeType: file.mimeType, + relativeId: file.path, + uploadOptions: {}, + }) - await uploadFileChunks(api, fileUpload.id, file.read(), uploadChunkSize).forEach((e) => - subscriber.next({ uploadBytes: e.uploadBytes }), - ) + await uploadFileChunks(api, fileUpload.id, file.read(), uploadChunkSize, options.onProgress) - await apiCalls.completeUpload(api, { uploadId: fileUpload.id }) + const result = await apiCalls.completeUpload(api, { uploadId: fileUpload.id }) - subscriber.complete() - }) + return result.cid } /** diff --git a/packages/auto-drive/src/fs/wrappers.ts b/packages/auto-drive/src/fs/wrappers.ts index 3cef329b..ec0f05ac 100644 --- a/packages/auto-drive/src/fs/wrappers.ts +++ b/packages/auto-drive/src/fs/wrappers.ts @@ -4,11 +4,10 @@ import { AutoDriveApi } from '../api/connection.js' import { apiCalls } from '../api/index.js' import { GenericFileWithinFolder } from '../api/models/file.js' import { constructFromFileSystemEntries } from '../api/models/folderTree.js' -import { UploadFileStatus, UploadFolderStatus } from '../api/models/uploads.js' +import { CompressionAlgorithm } from '../api/models/uploads.js' import { uploadFile, UploadFileOptions, uploadFileWithinFolderUpload } from '../api/wrappers.js' import { fileToIterable } from '../utils/index.js' import { progressToPercentage } from '../utils/misc.js' -import { PromisedObservable } from '../utils/observable.js' import { constructZipFromTreeAndFileSystemPaths, getFiles } from './utils.js' /** @@ -33,8 +32,8 @@ export const uploadFileFromFilepath = ( filePath: string, options: UploadFileOptions = {}, uploadChunkSize?: number, -): PromisedObservable => { - const { password = undefined, compression = true } = options +): Promise => { + const { password = undefined, compression = true, onProgress } = options const name = filePath.split('/').pop()! return uploadFile( @@ -48,6 +47,7 @@ export const uploadFileFromFilepath = ( { password, compression, + onProgress, }, uploadChunkSize, ) @@ -74,8 +74,16 @@ export const uploadFileFromFilepath = ( export const uploadFolderFromFolderPath = async ( api: AutoDriveApi, folderPath: string, - { uploadChunkSize, password }: { uploadChunkSize?: number; password?: string } = {}, -): Promise> => { + { + uploadChunkSize, + password, + onProgress, + }: { + uploadChunkSize?: number + password?: string + onProgress?: (progressInPercentage: number) => void + } = {}, +): Promise> => { const files = await getFiles(folderPath) const fileTree = constructFromFileSystemEntries(files) @@ -94,46 +102,43 @@ export const uploadFolderFromFolderPath = async ( { password, compression: true, + onProgress: (progressInPercentage) => { + onProgress?.(progressToPercentage(progressInPercentage, zipBlob.size)) + }, }, ) } - return new PromisedObservable(async (subscriber) => { - const { CompressionAlgorithm } = await import('@autonomys/auto-dag-data') - const folderUpload = await apiCalls.createFolderUpload(api, { - fileTree, - uploadOptions: { - compression: { - algorithm: CompressionAlgorithm.ZLIB, - level: 9, - }, + const folderUpload = await apiCalls.createFolderUpload(api, { + fileTree, + uploadOptions: { + compression: { + algorithm: CompressionAlgorithm.ZLIB, + level: 9, }, - }) - - const genericFiles: GenericFileWithinFolder[] = files.map((file) => ({ - read: () => fs.createReadStream(file), - name: file.split('/').pop()!, - mimeType: mime.lookup(file.split('/').pop()!) || undefined, - size: fs.statSync(file).size, - path: file, - })) + }, + }) - const totalSize = genericFiles.reduce((acc, file) => acc + file.size, 0) + const genericFiles: GenericFileWithinFolder[] = files.map((file) => ({ + read: () => fs.createReadStream(file), + name: file.split('/').pop()!, + mimeType: mime.lookup(file.split('/').pop()!) || undefined, + size: fs.statSync(file).size, + path: file, + })) - let progress = 0 - for (const file of genericFiles) { - await uploadFileWithinFolderUpload(api, folderUpload.id, file, uploadChunkSize).forEach((e) => - subscriber.next({ - type: 'folder', - progress: progressToPercentage(progress + e.uploadBytes, totalSize), - }), - ) - progress += file.size - } + const totalSize = genericFiles.reduce((acc, file) => acc + file.size, 0) + let progress = 0 + for (const file of genericFiles) { + await uploadFileWithinFolderUpload(api, folderUpload.id, file, uploadChunkSize, { + onProgress: (uploadedBytes) => { + onProgress?.(progressToPercentage(progress + uploadedBytes, totalSize)) + }, + }) + progress += file.size + } - const result = await apiCalls.completeUpload(api, { uploadId: folderUpload.id }) + const result = await apiCalls.completeUpload(api, { uploadId: folderUpload.id }) - subscriber.next({ type: 'folder', progress: 100, cid: result.cid }) - subscriber.complete() - }) + return result.cid } diff --git a/packages/auto-drive/src/utils/index.ts b/packages/auto-drive/src/utils/index.ts index 77a11717..eb878797 100644 --- a/packages/auto-drive/src/utils/index.ts +++ b/packages/auto-drive/src/utils/index.ts @@ -1,4 +1,3 @@ export * from './async' export * from './misc' -export * from './observable' export * from './types' diff --git a/packages/auto-drive/src/utils/observable.ts b/packages/auto-drive/src/utils/observable.ts deleted file mode 100644 index e687c5be..00000000 --- a/packages/auto-drive/src/utils/observable.ts +++ /dev/null @@ -1,19 +0,0 @@ -import * as rxjs from 'rxjs' - -const asyncCallback = (callback: (t: T) => O) => { - return (t: T) => { - callback(t) - } -} - -export class PromisedObservable extends rxjs.Observable { - constructor(subscribe?: (this: rxjs.Observable, subscriber: rxjs.Subscriber) => void) { - super(subscribe && asyncCallback(subscribe)) - } - - get promise(): Promise { - return lastValueFrom(this) - } -} - -export const { firstValueFrom, lastValueFrom } = rxjs diff --git a/packages/auto-drive/tsconfig.json b/packages/auto-drive/tsconfig.json index 49e05cea..b4430d1b 100644 --- a/packages/auto-drive/tsconfig.json +++ b/packages/auto-drive/tsconfig.json @@ -2,7 +2,9 @@ "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "./dist", - "rootDir": "./src" + "rootDir": "./src", + "module": "Node16", + "moduleResolution": "Node16" }, "include": ["src"] } diff --git a/yarn.lock b/yarn.lock index 21f08297..a3525a5a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -82,7 +82,6 @@ __metadata: process: "npm:^0.11.10" rollup-plugin-jscc: "npm:^2.0.0" rollup-plugin-terser: "npm:^7.0.2" - rxjs: "npm:^7.8.1" stream: "npm:^0.0.3" tslib: "npm:^2.8.1" typescript: "npm:^5.6.3" @@ -12366,11 +12365,11 @@ __metadata: "typescript@patch:typescript@npm%3A>=3 < 6#optional!builtin, typescript@patch:typescript@npm%3A^5#optional!builtin, typescript@patch:typescript@npm%3A^5.4.5#optional!builtin, typescript@patch:typescript@npm%3A^5.5.4#optional!builtin, typescript@patch:typescript@npm%3A^5.6.2#optional!builtin, typescript@patch:typescript@npm%3A^5.6.3#optional!builtin": version: 5.7.2 - resolution: "typescript@patch:typescript@npm%3A5.7.2#optional!builtin::version=5.7.2&hash=b45daf" + resolution: "typescript@patch:typescript@npm%3A5.7.2#optional!builtin::version=5.7.2&hash=5786d5" bin: tsc: bin/tsc tsserver: bin/tsserver - checksum: 10c0/c891ccf04008bc1305ba34053db951f8a4584b4a1bf2f68fd972c4a354df3dc5e62c8bfed4f6ac2d12e5b3b1c49af312c83a651048f818cd5b4949d17baacd79 + checksum: 10c0/f3b8082c9d1d1629a215245c9087df56cb784f9fb6f27b5d55577a20e68afe2a889c040aacff6d27e35be165ecf9dca66e694c42eb9a50b3b2c451b36b5675cb languageName: node linkType: hard