Skip to content

Commit

Permalink
update: remove observable interface
Browse files Browse the repository at this point in the history
  • Loading branch information
clostao committed Dec 20, 2024
1 parent 737fa13 commit cae1ce6
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 176 deletions.
1 change: 0 additions & 1 deletion packages/auto-drive/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"jszip": "^3.10.1",
"mime-types": "^2.1.35",
"process": "^0.11.10",
"rxjs": "^7.8.1",
"stream": "^0.0.3",
"zod": "^3.23.8"
},
Expand Down
225 changes: 113 additions & 112 deletions packages/auto-drive/src/api/wrappers.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import mime from 'mime-types'
import { asyncByChunk, asyncFromStream, fileToIterable } from '../utils/async'
import { progressToPercentage } from '../utils/misc'
import { PromisedObservable } from '../utils/observable'
import { apiCalls } from './calls/index'
import { AutoDriveApi } from './connection'
import { GenericFile, GenericFileWithinFolder } from './models/file'
import { constructFromInput, constructZipBlobFromTreeAndPaths } from './models/folderTree'
import { UploadChunksStatus, UploadFileStatus, UploadFolderStatus } from './models/uploads'

export type UploadFileOptions = {
password?: string
compression?: boolean
onProgress?: (progress: number) => void
}

const UPLOAD_FILE_CHUNK_SIZE = 1024 * 1024
Expand All @@ -20,17 +19,23 @@ const uploadFileChunks = (
fileUploadId: string,
asyncIterable: AsyncIterable<Buffer>,
uploadChunkSize: number = UPLOAD_FILE_CHUNK_SIZE,
): PromisedObservable<UploadChunksStatus> => {
return new PromisedObservable<UploadChunksStatus>(async (subscriber) => {
let index = 0
let uploadBytes = 0
for await (const chunk of asyncByChunk(asyncIterable, uploadChunkSize)) {
await apiCalls.uploadFileChunk(api, { uploadId: fileUploadId, chunk, index })
uploadBytes += chunk.length
subscriber.next({ uploadBytes })
index++
onProgress?: (uploadedBytes: number) => void,
): Promise<void> => {
return new Promise(async (resolve, reject) => {
try {
let index = 0
let uploadBytes = 0
for await (const chunk of asyncByChunk(asyncIterable, uploadChunkSize)) {
await apiCalls.uploadFileChunk(api, { uploadId: fileUploadId, chunk, index })
uploadBytes += chunk.length
onProgress?.(uploadBytes)
index++
}

resolve()
} catch (e) {
reject(e)
}
subscriber.complete()
})
}

Expand All @@ -56,11 +61,12 @@ export const uploadFileFromInput = (
file: File,
options: UploadFileOptions = {},
uploadChunkSize?: number,
): PromisedObservable<UploadFileStatus> => {
): Promise<string> => {
const { password = undefined, compression = true } = options
return new PromisedObservable<UploadFileStatus>(async (subscriber) => {
const { stringToCid, compressFile, CompressionAlgorithm, encryptFile, EncryptionAlgorithm } =
await import('@autonomys/auto-dag-data')
return new Promise(async (resolve, reject) => {
const { compressFile, CompressionAlgorithm, encryptFile, EncryptionAlgorithm } = await import(
'@autonomys/auto-dag-data'
)
let asyncIterable: AsyncIterable<Buffer> = fileToIterable(file)

if (compression) {
Expand Down Expand Up @@ -95,14 +101,13 @@ export const uploadFileFromInput = (
uploadOptions,
})

await uploadFileChunks(api, fileUpload.id, asyncIterable, uploadChunkSize).forEach((e) =>
subscriber.next({ type: 'file', progress: progressToPercentage(e.uploadBytes, file.size) }),
)
await uploadFileChunks(api, fileUpload.id, asyncIterable, uploadChunkSize, (bytes) => {
options.onProgress?.(progressToPercentage(bytes, file.size))
})

const result = await apiCalls.completeUpload(api, { uploadId: fileUpload.id })

subscriber.next({ type: 'file', progress: 100, cid: result.cid })
subscriber.complete()
resolve(result.cid)
})
}

Expand All @@ -123,60 +128,58 @@ export const uploadFileFromInput = (
* @returns {PromisedObservable<UploadFileStatus>} - An observable that emits the upload status.
* @throws {Error} - Throws an error if the upload fails at any stage.
*/
export const uploadFile = (
export const uploadFile = async (
api: AutoDriveApi,
file: GenericFile,
options: UploadFileOptions = {},
uploadChunkSize?: number,
): PromisedObservable<UploadFileStatus> => {
): Promise<string> => {
const { password = undefined, compression = true } = options

return new PromisedObservable<UploadFileStatus>(async (subscriber) => {
const { stringToCid, compressFile, CompressionAlgorithm, encryptFile, EncryptionAlgorithm } =
await import('@autonomys/auto-dag-data')
let asyncIterable: AsyncIterable<Buffer> = file.read()

if (compression) {
asyncIterable = compressFile(asyncIterable, {
level: 9,
algorithm: CompressionAlgorithm.ZLIB,
})
}

if (password) {
asyncIterable = encryptFile(asyncIterable, password, {
algorithm: EncryptionAlgorithm.AES_256_GCM,
})
}
const { compressFile, CompressionAlgorithm, encryptFile, EncryptionAlgorithm } = await import(
'@autonomys/auto-dag-data'
)
let asyncIterable: AsyncIterable<Buffer> = file.read()

const uploadOptions = {
compression: compression
? {
level: 9,
algorithm: CompressionAlgorithm.ZLIB,
}
: undefined,
encryption: password
? {
algorithm: EncryptionAlgorithm.AES_256_GCM,
}
: undefined,
}
const fileUpload = await apiCalls.createFileUpload(api, {
mimeType: mime.lookup(file.name) || undefined,
filename: file.name,
uploadOptions,
if (compression) {
asyncIterable = compressFile(asyncIterable, {
level: 9,
algorithm: CompressionAlgorithm.ZLIB,
})
}

await uploadFileChunks(api, fileUpload.id, asyncIterable, uploadChunkSize).forEach((e) =>
subscriber.next({ type: 'file', progress: progressToPercentage(e.uploadBytes, file.size) }),
)
if (password) {
asyncIterable = encryptFile(asyncIterable, password, {
algorithm: EncryptionAlgorithm.AES_256_GCM,
})
}

const result = await apiCalls.completeUpload(api, { uploadId: fileUpload.id })
const uploadOptions = {
compression: compression
? {
level: 9,
algorithm: CompressionAlgorithm.ZLIB,
}
: undefined,
encryption: password
? {
algorithm: EncryptionAlgorithm.AES_256_GCM,
}
: undefined,
}
const fileUpload = await apiCalls.createFileUpload(api, {
mimeType: mime.lookup(file.name) || undefined,
filename: file.name,
uploadOptions,
})

subscriber.next({ type: 'file', progress: 100, cid: result.cid })
subscriber.complete()
await uploadFileChunks(api, fileUpload.id, asyncIterable, uploadChunkSize, (bytes) => {
options.onProgress?.(progressToPercentage(bytes, file.size))
})

const result = await apiCalls.completeUpload(api, { uploadId: fileUpload.id })

return result.cid
}

/**
Expand All @@ -199,8 +202,12 @@ export const uploadFile = (
export const uploadFolderFromInput = async (
api: AutoDriveApi,
fileList: FileList | File[],
{ uploadChunkSize, password }: { uploadChunkSize?: number; password?: string } = {},
): Promise<PromisedObservable<UploadFileStatus | UploadFolderStatus>> => {
{
uploadChunkSize,
password,
onProgress,
}: { uploadChunkSize?: number; password?: string; onProgress?: (progress: number) => void } = {},
): Promise<string> => {
const files = fileList instanceof FileList ? Array.from(fileList) : fileList
const fileTree = constructFromInput(files)

Expand All @@ -223,45 +230,42 @@ export const uploadFolderFromInput = async (
{
password,
compression: true,
onProgress,
},
)
}

return new PromisedObservable<UploadFolderStatus>(async (subscriber) => {
// Otherwise, we upload the files as a folder w/o compression or encryption
const folderUpload = await apiCalls.createFolderUpload(api, {
fileTree,
})
// Otherwise, we upload the files as a folder w/o compression or encryption
const folderUpload = await apiCalls.createFolderUpload(api, {
fileTree,
})

let currentBytesUploaded = 0
const totalSize = files.reduce((acc, file) => acc + file.size, 0)
for (const file of files) {
await uploadFileWithinFolderUpload(
api,
folderUpload.id,
{
read: () => fileToIterable(file),
name: file.name,
mimeType: mime.lookup(file.name) || undefined,
size: file.size,
path: file.webkitRelativePath,
let currentBytesUploaded = 0
const totalSize = files.reduce((acc, file) => acc + file.size, 0)
for (const file of files) {
await uploadFileWithinFolderUpload(
api,
folderUpload.id,
{
read: () => fileToIterable(file),
name: file.name,
mimeType: mime.lookup(file.name) || undefined,
size: file.size,
path: file.webkitRelativePath,
},
uploadChunkSize,
{
onProgress: (progress) => {
onProgress?.(progressToPercentage(currentBytesUploaded + progress, totalSize))
},
uploadChunkSize,
).forEach((e) => {
subscriber.next({
type: 'folder',
progress: progressToPercentage(currentBytesUploaded + e.uploadBytes, totalSize),
})
})

currentBytesUploaded += file.size
}
},
)
currentBytesUploaded += file.size
}

const result = await apiCalls.completeUpload(api, { uploadId: folderUpload.id })
const result = await apiCalls.completeUpload(api, { uploadId: folderUpload.id })

subscriber.next({ type: 'folder', progress: 100, cid: result.cid })
subscriber.complete()
})
return result.cid
}

/**
Expand All @@ -273,29 +277,26 @@ export const uploadFolderFromInput = async (
*
* @returns {Promise<void>} A promise that resolves when the file upload is complete.
*/
export const uploadFileWithinFolderUpload = (
export const uploadFileWithinFolderUpload = async (
api: AutoDriveApi,
uploadId: string,
file: GenericFileWithinFolder,
uploadChunkSize?: number,
): PromisedObservable<UploadChunksStatus> => {
return new PromisedObservable<UploadChunksStatus>(async (subscriber) => {
const fileUpload = await apiCalls.createFileUploadWithinFolderUpload(api, {
uploadId,
name: file.name,
mimeType: file.mimeType,
relativeId: file.path,
uploadOptions: {},
})
options: Pick<UploadFileOptions, 'onProgress'> = {},
): Promise<string> => {
const fileUpload = await apiCalls.createFileUploadWithinFolderUpload(api, {
uploadId,
name: file.name,
mimeType: file.mimeType,
relativeId: file.path,
uploadOptions: {},
})

await uploadFileChunks(api, fileUpload.id, file.read(), uploadChunkSize).forEach((e) =>
subscriber.next({ uploadBytes: e.uploadBytes }),
)
await uploadFileChunks(api, fileUpload.id, file.read(), uploadChunkSize, options.onProgress)

await apiCalls.completeUpload(api, { uploadId: fileUpload.id })
const result = await apiCalls.completeUpload(api, { uploadId: fileUpload.id })

subscriber.complete()
})
return result.cid
}

/**
Expand Down
Loading

0 comments on commit cae1ce6

Please sign in to comment.