diff --git a/src/services/runtime/types.ts b/src/services/runtime/types.ts index 452d5484..654ad302 100644 --- a/src/services/runtime/types.ts +++ b/src/services/runtime/types.ts @@ -83,7 +83,7 @@ export type StorageNodeInfo = { export type AssetUploadInput = { dataObjectId: DataObjectId - file: Readable + file: () => Promise } export type VideoFFProbeMetadata = { diff --git a/src/services/storage-node/api.ts b/src/services/storage-node/api.ts index 7e1c1a73..ddbe0653 100644 --- a/src/services/storage-node/api.ts +++ b/src/services/storage-node/api.ts @@ -11,6 +11,8 @@ import { LoggingService } from '../logging' import { QueryNodeApi } from '../query-node/api' import { getThumbnailAsset } from '../runtime/client' import { AssetUploadInput, StorageNodeInfo } from '../runtime/types' +import { Readable } from 'stream' +import sleep from 'sleep-promise' export type OperatorInfo = { id: string; endpoint: string } export type OperatorsMapping = Record @@ -18,6 +20,9 @@ export type VideoUploadResponse = { id: string // hash of dataObject uploaded } +const UPLOAD_MAX_ATTEMPTS = 3 +const UPLOAD_RETRY_INTERVAL = 10 + export class StorageNodeApi { private logger: Logger @@ -29,65 +34,57 @@ export class StorageNodeApi { const assetsInput: AssetUploadInput[] = [ { dataObjectId: createType('u64', new BN(video.joystreamVideo.assetIds[0])), - file: fs.createReadStream(videoFilePath), + file: async () => { return fs.createReadStream(videoFilePath) }, }, { dataObjectId: createType('u64', new BN(video.joystreamVideo.assetIds[1])), - file: await getThumbnailAsset(video.thumbnails), + file: async () => { return getThumbnailAsset(video.thumbnails) }, }, ] return this.upload(bagId, assetsInput) } private async upload(bagId: string, assets: AssetUploadInput[]) { - // Get a random active storage node for given bag - const operator = await this.getRandomActiveStorageNodeInfo(bagId) - if (!operator) { - throw new StorageApiError( - ExitCodes.StorageApi.NO_ACTIVE_STORAGE_PROVIDER, - `No active storage node found for bagId: ${bagId}` - ) - } - for (const { dataObjectId, file } of assets) { - try { - this.logger.debug('Uploading asset', { dataObjectId: dataObjectId.toString() }) + for (const attempt of _.range(1, UPLOAD_MAX_ATTEMPTS)) { + // Randomly select one active storage node for given bag + const operator = await this.getRandomActiveStorageNodeInfo(bagId) + if (!operator) { + throw new StorageApiError( + ExitCodes.StorageApi.NO_ACTIVE_STORAGE_PROVIDER, + `No active storage node found for bagId: ${bagId}` + ) + } + const fileStream = await file() + try { + await this.uploadAsset(operator, bagId, dataObjectId.toString(), fileStream) + break // upload successfull, continue with next asset + } catch (error) { + fileStream.destroy() - const formData = new FormData() - formData.append('file', file, 'video.mp4') - await axios.post(`${operator.apiEndpoint}/files`, formData, { - params: { - dataObjectId: dataObjectId.toString(), - storageBucketId: operator.bucketId, - bagId, - }, - maxBodyLength: Infinity, - maxContentLength: Infinity, - maxRedirects: 0, - headers: { - 'content-type': 'multipart/form-data', - ...formData.getHeaders(), - }, - }) - } catch (error) { - // destroy the file stream - file.destroy() + if (axios.isAxiosError(error) && error.response) { + const storageNodeUrl = error.config?.url + const { status, data } = error.response + error = new Error(data?.message) - if (axios.isAxiosError(error) && error.response) { - const storageNodeUrl = error.config?.url - const { status, data } = error.response + this.logger.error(`${storageNodeUrl} - errorCode: ${status}, msg: ${data?.message}`) - if (data?.message?.includes(`Data object ${dataObjectId} already exist`)) { - // No need to throw an error, we can continue with the next asset - continue - } + if (data?.message?.includes(`Data object ${dataObjectId} already exist`)) { + // No need to throw an error, we can continue with the next asset + break + } - this.logger.error(`${storageNodeUrl} - errorCode: ${status}, msg: ${data?.message}`) + if (data?.message?.includes(`Data object ${dataObjectId} doesn't exist in storage bag ${bagId}`)) { + if (attempt < UPLOAD_MAX_ATTEMPTS) { + this.logger.error(`Will retry upload of asset ${dataObjectId} in ${UPLOAD_RETRY_INTERVAL} seconds.`) + await sleep(UPLOAD_RETRY_INTERVAL * 1000) + continue // try again + } + } + } - throw new Error(data?.message) + throw error } - - throw error } } } @@ -115,10 +112,31 @@ export class StorageNodeApi { this.logger.debug( `No storage provider can serve the request yet, retrying in ${retryTime}s (${i + 1}/${retryCount})...` ) - await new Promise((resolve) => setTimeout(resolve, retryTime * 1000)) + await sleep(retryTime * 1000) } } return null } + + async uploadAsset(operator: StorageNodeInfo, bagId: string, dataObjectId: string, file: Readable) { + this.logger.debug('Uploading asset', { dataObjectId }) + + const formData = new FormData() + formData.append('file', file, 'video.mp4') + await axios.post(`${operator.apiEndpoint}/files`, formData, { + params: { + dataObjectId, + storageBucketId: operator.bucketId, + bagId, + }, + maxBodyLength: Infinity, + maxContentLength: Infinity, + maxRedirects: 0, + headers: { + 'content-type': 'multipart/form-data', + ...formData.getHeaders(), + }, + }) + } } diff --git a/src/services/syncProcessing/ContentUploadService.ts b/src/services/syncProcessing/ContentUploadService.ts index 40aff66f..f995b480 100644 --- a/src/services/syncProcessing/ContentUploadService.ts +++ b/src/services/syncProcessing/ContentUploadService.ts @@ -68,8 +68,7 @@ export class ContentUploadService { if ( error instanceof Error && (error.message.includes(`File multihash doesn't match the data object's ipfsContentId`) || - error.message.includes(`File size doesn't match the data object's`) || - error.message.includes(`doesn't exist in storage bag`)) + error.message.includes(`File size doesn't match the data object's`)) ) { console.log('VideoUnavailable::Other', job.data.id) await this.dynamodbService.videos.updateState(job.data, 'VideoUnavailable::Other') diff --git a/src/services/syncProcessing/PriorityQueue.ts b/src/services/syncProcessing/PriorityQueue.ts index c9a24438..07615dcf 100644 --- a/src/services/syncProcessing/PriorityQueue.ts +++ b/src/services/syncProcessing/PriorityQueue.ts @@ -8,6 +8,7 @@ import { DynamodbService } from '../../repository' import { ReadonlyConfig } from '../../types' import { YtChannel, YtVideo } from '../../types/youtube' import { SyncUtils } from './utils' +import sleep from 'sleep-promise' export const QUEUE_NAME_PREFIXES = ['Upload', 'Creation', 'Metadata', 'Download'] as const @@ -192,8 +193,7 @@ class PriorityJobQueue< await this.asyncLock.acquire(this.RECALCULATE_PRIORITY_LOCK_KEY, async () => { // Wait until all processing tasks have completed while (this.processingCount > 0) { - console.log('recalculateJobsPriority', this.queue.name, this.processingCount, this.processingTasks) - await new Promise((resolve) => setTimeout(resolve, 1000)) + await sleep(1000) } const jobs = await this.queue.getJobs(['prioritized']) diff --git a/src/services/syncProcessing/index.ts b/src/services/syncProcessing/index.ts index eb735d2e..caf0a407 100644 --- a/src/services/syncProcessing/index.ts +++ b/src/services/syncProcessing/index.ts @@ -226,7 +226,7 @@ export class ContentProcessingService extends ContentProcessingClient implements // add job flow to the flow producer const jobNode = await this.flowManager.addFlowJob(flowJob) - jobNode.job + return jobNode.job } } })