Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
TheodoreKrypton committed May 11, 2024
1 parent fdfff82 commit 0fb1922
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 36 deletions.
99 changes: 64 additions & 35 deletions src/api/client/message-api/file-uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import path from 'path';
import { ITDLibClient, TDLibApi } from 'src/api/interface';
import { SendMessageResp, UploadedFile } from 'src/api/types';
import { Queue, generateFileId, getAppropriatedPartSize } from 'src/api/utils';
import { AggregatedError, TechnicalError } from 'src/errors/base';
import { AggregatedError } from 'src/errors/base';
import { FileTooBig } from 'src/errors/telegram';
import { manager } from 'src/server/manager';
import { Logger } from 'src/utils/logger';
Expand All @@ -35,13 +35,25 @@ export abstract class FileUploader<T extends GeneralFileMessage> {

private _errors: { [key: number]: Error } = {};

private _uploadingChunks: Array<{ chunk: Buffer; filePart: number }>;

constructor(
protected readonly client: ITDLibClient,
protected readonly fileSize: bigInt.BigInteger,
private readonly onComplete: () => Promise<void>,
private readonly workers: {
small?: number;
big?: number;
} = {
small: 3,
big: 15,
},
) {
this.fileId = generateFileId();
this.isBig = isBig(fileSize);
this._uploadingChunks = new Array(
this.isBig ? this.workers.big : this.workers.small,
);
}

protected abstract get defaultFileName(): string;
Expand All @@ -60,24 +72,11 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
: quotient.toJSNumber() + 1;
}

private async uploadNextPart(workerId: number): Promise<bigInt.BigInteger> {
if (this.done()) {
return bigInt.zero;
}

const chunkLength =
this.uploaded.add(this.chunkSize) > this.fileSize
? this.fileSize.minus(this.uploaded)
: this.chunkSize;
this.uploaded = this.uploaded.add(chunkLength);
const filePart = this.partCnt++; // 0-indexed

if (chunkLength.eq(0)) {
return bigInt.zero;
}

const chunk = await this.read(chunkLength.toJSNumber());

private async saveChunk(
workerId: number,
chunk: Buffer,
filePart: number,
): Promise<void> {
while (true) {
try {
const rsp = this.isBig
Expand All @@ -92,39 +91,68 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
filePart,
bytes: chunk,
});
Logger.debug(
`[worker ${workerId}] uploaded chunk ${filePart} success=${rsp.success}`,
);
if (!rsp.success) {
throw new TechnicalError(
`File chunk ${filePart} of ${this.fileName} failed to upload`,
);
if (rsp.success) {
this._uploadingChunks[workerId] = null;
return;
}
return chunkLength;
} catch (err) {
if (err instanceof RPCError) {
if (err.errorMessage === 'FILE_PARTS_INVALID') {
throw new FileTooBig(this.fileSize);
}
}
console.error(err);
}
}
}

private async saveUncompletedChunks() {
while (true) {
const unfinishedChunks = this._uploadingChunks.filter((x) => x !== null);

if (unfinishedChunks.length === 0) {
break;
}

for (const { chunk, filePart } of unfinishedChunks) {
if (chunk) {
await this.saveChunk(0, chunk, filePart);
}
}
}
}

private async uploadNextPart(workerId: number): Promise<bigInt.BigInteger> {
if (this.done()) {
return bigInt.zero;
}

const chunkLength =
this.uploaded.add(this.chunkSize) > this.fileSize
? this.fileSize.minus(this.uploaded)
: this.chunkSize;
this.uploaded = this.uploaded.add(chunkLength);
const filePart = this.partCnt++; // 0-indexed

if (chunkLength.eq(0)) {
return bigInt.zero;
}

const chunk = await this.read(chunkLength.toJSNumber());
this._uploadingChunks[workerId] = { chunk, filePart };

await this.saveChunk(workerId, chunk, filePart);

return chunkLength;
}

public async upload(
file: T,
callback?: (
uploaded: bigInt.BigInteger,
totalSize: bigInt.BigInteger,
) => void,
fileName?: string,
workers: {
small?: number;
big?: number;
} = {
small: 3,
big: 15,
},
): Promise<void> {
const task = manager.createUploadTask(this.fileName, this.fileSize);
this.prepare(file);
Expand All @@ -149,6 +177,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
}
if (!onCompleteEmitted) {
onCompleteEmitted = true;
await this.saveUncompletedChunks();
await this.onComplete();
}
return true;
Expand All @@ -160,7 +189,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {

const promises: Array<Promise<boolean>> = [];

const numWorkers = this.isBig ? workers.big : workers.small;
const numWorkers = this.isBig ? this.workers.big : this.workers.small;
for (let i = 0; i < numWorkers; i++) {
promises.push(createWorker(i));
}
Expand Down
2 changes: 2 additions & 0 deletions src/api/ops/create-dir.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { Client } from 'src/api';
import { RelativePathError } from 'src/errors/path';
import { Logger } from 'src/utils/logger';

import { navigateToDir } from './navigate-to-dir';
import { splitPath } from './utils';

export const createDir =
(client: Client) => async (path: string, parents: boolean) => {
Logger.info(`Creating directory ${path}`);
if (!parents) {
const [basePath, name] = splitPath(path);
const dir = await navigateToDir(client)(basePath);
Expand Down
2 changes: 1 addition & 1 deletion src/server/webdav/tgfs-filesystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ export class TGFSFileSystem extends FileSystem {
fileRef,
fileRef.name,
);

callback(null, Readable.from(chunks));
} catch (err) {
handleError(callback)(err);
Expand Down

0 comments on commit 0fb1922

Please sign in to comment.