Skip to content

Commit

Permalink
wait for 500ms before sending the file after uploading all the chunks…
Browse files Browse the repository at this point in the history
…; manually fires 'finish' event after the file is successfully uploaded
  • Loading branch information
TheodoreKrypton committed Jun 26, 2024
1 parent 5ce2497 commit 7384c0f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 37 deletions.
72 changes: 39 additions & 33 deletions src/api/client/message-api/file-uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { FileTooBig } from 'src/errors/telegram';
import { manager } from 'src/server/manager';
import { Logger } from 'src/utils/logger';
import { Queue } from 'src/utils/queue';
import { sleep } from 'src/utils/sleep';

import {
FileMessageFromBuffer,
Expand All @@ -32,7 +33,8 @@ export abstract class FileUploader<T extends GeneralFileMessage> {

private isBig: boolean;
private partCnt: number = 0;
private uploaded: bigInt.BigInteger = bigInt.zero;
private readSize: bigInt.BigInteger = bigInt.zero;
private uploadedSize: bigInt.BigInteger = bigInt.zero;

private _errors: { [key: number]: Error } = {};

Expand Down Expand Up @@ -73,11 +75,13 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
: quotient.toJSNumber() + 1;
}

private async saveChunk(
private async uploadChunk(
workerId: number,
chunk: Buffer,
filePart: number,
): Promise<void> {
this._uploadingChunks[workerId] = { chunk, filePart };

while (true) {
try {
const rsp = this.isBig
Expand All @@ -94,6 +98,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
});
if (rsp.success) {
this._uploadingChunks[workerId] = null;
this.uploadedSize = this.uploadedSize.add(chunk.length);
return;
}
} catch (err) {
Expand All @@ -102,22 +107,21 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
throw new FileTooBig(this.fileSize);
}
}
console.error(err);
Logger.error(`${this.fileName} ${err}`);
}
}
}

private async saveUncompletedChunks() {
while (true) {
const unfinishedChunks = this._uploadingChunks.filter((x) => x !== null);

if (unfinishedChunks.length === 0) {
const unCompletedChunks = this._uploadingChunks.filter((x) => x !== null);
if (unCompletedChunks.length === 0) {
break;
}

for (const { chunk, filePart } of unfinishedChunks) {
for (const { chunk, filePart } of unCompletedChunks) {
if (chunk) {
await this.saveChunk(0, chunk, filePart);
await this.uploadChunk(0, chunk, filePart);
}
}
}
Expand All @@ -129,20 +133,19 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
}

const chunkLength =
this.uploaded.add(this.chunkSize) > this.fileSize
? this.fileSize.minus(this.uploaded)
this.readSize.add(this.chunkSize) > this.fileSize
? this.fileSize.minus(this.readSize)
: this.chunkSize;
this.uploaded = this.uploaded.add(chunkLength);
this.readSize = this.readSize.add(chunkLength);
const filePart = this.partCnt++; // 0-indexed

if (chunkLength.eq(0)) {
return bigInt.zero;
}

const chunk = await this.read(chunkLength.toJSNumber());
this._uploadingChunks[workerId] = { chunk, filePart };

await this.saveChunk(workerId, chunk, filePart);
await this.uploadChunk(workerId, chunk, filePart);

return chunkLength;
}
Expand All @@ -160,42 +163,43 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
try {
this.fileName = fileName ?? this.defaultFileName;

let onCompleteEmitted = false;

const createWorker = async (workerId: number): Promise<boolean> => {
try {
while (!this.done()) {
const partSize = await this.uploadNextPart(workerId);
Logger.info(
`[worker ${workerId}] ${this.uploadedSize
.multiply(100)
.divide(this.fileSize)
.toJSNumber()}% uploaded ${this.fileId}(${this.fileName})`,
);

if (partSize && callback) {
Logger.info(
`[worker ${workerId}] ${this.uploaded
.multiply(100)
.divide(this.fileSize)
.toJSNumber()}% uploaded ${this.fileId}`,
);
callback(this.uploaded, this.fileSize);
callback(this.readSize, this.fileSize);
}
}
if (!onCompleteEmitted) {
onCompleteEmitted = true;
await this.saveUncompletedChunks();
await this.onComplete();
}

return true;
} catch (err) {
this._errors[workerId] = err;
return false;
}
};

const promises: Array<Promise<boolean>> = [];
while (this.uploadedSize < this.fileSize) {
const promises: Array<Promise<boolean>> = [];

const numWorkers = this.isBig ? this.workers.big : this.workers.small;
for (let i = 0; i < numWorkers; i++) {
promises.push(createWorker(i));
const numWorkers = this.isBig ? this.workers.big : this.workers.small;
for (let i = 0; i < numWorkers; i++) {
promises.push(createWorker(i));
}

await Promise.all(promises);
await this.saveUncompletedChunks();
}

await Promise.all(promises);
await sleep(500); // sleep 500ms before sending the file message, otherwise Telegram reports FILE_PART_0_MISSING error.
await this.onComplete();
} finally {
this.close();

Expand All @@ -207,7 +211,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
}

private done(): boolean {
return this.uploaded >= this.fileSize;
return this.readSize >= this.fileSize;
}

public get errors(): Array<Error> {
Expand All @@ -218,6 +222,8 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
chatId: number,
caption?: string,
): Promise<SendMessageResp> {
Logger.info(`sending file ${this.fileName}`);

if (Object.keys(this._errors).length > 0) {
throw new AggregatedError(this.errors);
}
Expand Down
8 changes: 6 additions & 2 deletions src/server/webdav/tgfs-filesystem.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PassThrough, Readable, Writable } from 'stream';
import { Readable, Writable } from 'stream';

import {
CreateInfo,
Expand Down Expand Up @@ -30,6 +30,8 @@ import { TGFSDirectory, TGFSFileRef } from 'src/model/directory';
import { TGFSFile } from 'src/model/file';
import { Logger } from 'src/utils/logger';

import { UploadStream } from './upload-stream';

export class TGFSSerializer implements FileSystemSerializer {
constructor(private readonly client: Client) {}

Expand Down Expand Up @@ -269,7 +271,7 @@ export class TGFSFileSystem extends VirtualFileSystem {
const tgClient = this.tgClient;
const { estimatedSize } = ctx;

const stream = new PassThrough();
const stream = new UploadStream();

callback(null, stream);

Expand All @@ -290,6 +292,8 @@ export class TGFSFileSystem extends VirtualFileSystem {
);

this.resources[path.toString()] = TGFSFileResource.fromFileDesc(fd);

stream.finish();
} catch (err) {
stream.destroy();
throw err;
Expand Down
13 changes: 13 additions & 0 deletions src/server/webdav/upload-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { PassThrough, TransformOptions } from 'stream';

export class UploadStream extends PassThrough {
constructor(options?: TransformOptions) {
super(options);

this.on('end', () => {});
}

finish() {
this.emit('finish');
}
}
3 changes: 1 addition & 2 deletions src/utils/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ export function retry(
const result = await originalMethod.apply(this, args);
return result;
} catch (error) {
console.error(error);
if (i === retries) throw error;
const waitTime = Math.pow(2, i) * backoff;
Logger.error(
`Method ${String(propertyKey)}: Attempt ${i + 1} failed. Retrying in ${waitTime}ms...`,
`Method ${String(propertyKey)}: Attempt ${i + 1} failed because of ${error}. Retrying in ${waitTime}ms...`,
);
await new Promise((resolve) => setTimeout(resolve, waitTime));
}
Expand Down

0 comments on commit 7384c0f

Please sign in to comment.