Skip to content

Commit

Permalink
merge monitor branch and resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
TheodoreKrypton committed Apr 25, 2024
2 parents 264492a + d0c5752 commit d2142d7
Show file tree
Hide file tree
Showing 29 changed files with 721 additions and 216 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"input": "^1.0.1",
"ip": "^1.1.9",
"js-yaml": "^4.1.0",
"jsonwebtoken": "^9.0.2",
"telegraf": "^4.15.0",
"telegram": "^2.18.38",
"uuid": "^9.0.0",
Expand All @@ -31,6 +32,7 @@
"@types/ip": "^1.1.0",
"@types/jest": "^29.5.2",
"@types/js-yaml": "^4.0.5",
"@types/jsonwebtoken": "^9.0.6",
"@types/node": "^20.3.3",
"@types/supertest": "^2.0.12",
"@types/uuid": "^9.0.2",
Expand Down
37 changes: 25 additions & 12 deletions src/api/client/message-api/api.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { Hash, createHash } from 'crypto';
import fs from 'fs';

import bigInt from 'big-integer';

import { IBot, TDLibApi } from 'src/api/interface';
import { config } from 'src/config';
import { TechnicalError } from 'src/errors/base';
import { MessageNotFound } from 'src/errors/telegram';
import { db } from 'src/server/manager/db';
import { manager } from 'src/server/manager';
import { Logger } from 'src/utils/logger';

import { getUploader } from './file-uploader';
Expand Down Expand Up @@ -116,7 +118,10 @@ export class MessageApi extends MessageBroker {
return caption;
}

private static report(uploaded: number, totalSize: number) {
private static report(
uploaded: bigInt.BigInteger,
totalSize: bigInt.BigInteger,
) {
// Logger.info(`${(uploaded / totalSize) * 100}% uploaded`);
}

Expand Down Expand Up @@ -173,20 +178,28 @@ export class MessageApi extends MessageBroker {
name: string,
messageId: number,
): AsyncGenerator<Buffer> {
const task = db.createTask(name, 0, 'download');

let downloaded = 0;
let downloaded: bigInt.BigInteger = bigInt.zero;

for await (const buffer of this.tdlib.account.downloadFile({
const { chunks, size } = await this.tdlib.account.downloadFile({
chatId: this.privateChannelId,
messageId: messageId,
chunkSize: config.tgfs.download.chunk_size_kb,
})) {
yield buffer;
downloaded += buffer.length;
task.reportProgress(downloaded);
}
});

task.finish();
const task = manager.createDownloadTask(name, size);
task.begin();

try {
for await (const buffer of chunks) {
yield buffer;
downloaded = downloaded.add(buffer.length);
task.reportProgress(downloaded);
}
} catch (err) {
task.setErrors([err]);
throw err;
} finally {
task.complete();
}
}
}
60 changes: 36 additions & 24 deletions src/api/client/message-api/file-uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { SendMessageResp, UploadedFile } from 'src/api/types';
import { Queue, generateFileId, getAppropriatedPartSize } from 'src/api/utils';
import { AggregatedError, TechnicalError } from 'src/errors/base';
import { FileTooBig } from 'src/errors/telegram';
import { manager } from 'src/server/manager';
import { Logger } from 'src/utils/logger';

import {
Expand All @@ -20,8 +21,8 @@ import {
GeneralFileMessage,
} from './types';

const isBig = (fileSize: number): boolean => {
return fileSize >= 10 * 1024 * 1024;
const isBig = (fileSize: bigInt.BigInteger): boolean => {
return fileSize.greaterOrEquals(10 * 1024 * 1024);
};

export abstract class FileUploader<T extends GeneralFileMessage> {
Expand All @@ -30,13 +31,13 @@ export abstract class FileUploader<T extends GeneralFileMessage> {

private isBig: boolean;
private partCnt: number = 0;
private uploaded: number = 0;
private uploaded: bigInt.BigInteger = bigInt.zero;

private _errors: { [key: number]: Error } = {};

constructor(
protected readonly client: ITDLibClient,
protected readonly fileSize: number,
protected readonly fileSize: bigInt.BigInteger,
) {
this.fileId = generateFileId();
this.isBig = isBig(fileSize);
Expand All @@ -47,31 +48,34 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
protected close(): void {}
protected abstract read(length: number): Promise<Buffer>;

private get chunkSize(): number {
return getAppropriatedPartSize(bigInt(this.fileSize)) * 1024;
private get chunkSize(): bigInt.BigInteger {
return bigInt(getAppropriatedPartSize(this.fileSize) * 1024);
}

private get parts(): number {
return Math.ceil(this.fileSize / this.chunkSize);
const { quotient, remainder } = this.fileSize.divmod(this.chunkSize);
return remainder.equals(0)
? quotient.toJSNumber()
: quotient.toJSNumber() + 1;
}

private async uploadNextPart(workerId: number): Promise<number> {
private async uploadNextPart(workerId: number): Promise<bigInt.BigInteger> {
if (this.done()) {
return 0;
return bigInt.zero;
}

const chunkLength =
this.uploaded + this.chunkSize > this.fileSize
? this.fileSize - this.uploaded
this.uploaded.add(this.chunkSize) > this.fileSize
? this.fileSize.minus(this.uploaded)
: this.chunkSize;
this.uploaded += chunkLength;
this.uploaded = this.uploaded.add(chunkLength);
const filePart = this.partCnt++; // 0-indexed

if (chunkLength === 0) {
return 0;
if (chunkLength.eq(0)) {
return bigInt.zero;
}

const chunk = await this.read(chunkLength);
const chunk = await this.read(chunkLength.toJSNumber());

let retry = 3;
while (retry) {
Expand Down Expand Up @@ -115,7 +119,10 @@ export abstract class FileUploader<T extends GeneralFileMessage> {

public async upload(
file: T,
callback?: (uploaded: number, totalSize: number) => void,
callback?: (
uploaded: bigInt.BigInteger,
totalSize: bigInt.BigInteger,
) => void,
fileName?: string,
workers: {
small?: number;
Expand All @@ -125,6 +132,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
big: 15,
},
): Promise<void> {
const task = manager.createUploadTask(this.fileName, this.fileSize);
this.prepare(file);
try {
this.fileName = fileName ?? this.defaultFileName;
Expand All @@ -135,9 +143,10 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
const partSize = await this.uploadNextPart(workerId);
if (partSize && callback) {
Logger.info(
`[worker ${workerId}] ${
(this.uploaded * 100) / this.fileSize
}% uploaded`,
`[worker ${workerId}] ${this.uploaded
.multiply(100)
.divide(this.fileSize)
.toJSNumber()}% uploaded`,
);
callback(this.uploaded, this.fileSize);
}
Expand All @@ -159,6 +168,9 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
await Promise.all(promises);
} finally {
this.close();

task.errors = this.errors;
task.complete();
}
}

Expand Down Expand Up @@ -319,19 +331,19 @@ export function getUploader(
tdlib: TDLibApi,
fileMsg: GeneralFileMessage,
): FileUploader<GeneralFileMessage> {
const selectApi = (fileSize: number) => {
const selectApi = (fileSize: bigInt.BigInteger) => {
// bot cannot upload files larger than 50MB
return fileSize > 50 * 1024 * 1024 ? tdlib.account : tdlib.bot;
return fileSize.greater(50 * 1024 * 1024) ? tdlib.account : tdlib.bot;
};

if ('path' in fileMsg) {
const fileSize = fs.statSync(fileMsg.path).size;
const fileSize = bigInt(fs.statSync(fileMsg.path).size);
return new UploaderFromPath(selectApi(fileSize), fileSize);
} else if ('buffer' in fileMsg) {
const fileSize = fileMsg.buffer.length;
const fileSize = bigInt(fileMsg.buffer.length);
return new UploaderFromBuffer(selectApi(fileSize), fileSize);
} else if ('stream' in fileMsg) {
const fileSize = fileMsg.size;
const fileSize = bigInt(fileMsg.size);
return new UploaderFromStream(selectApi(fileSize), fileSize);
}
}
36 changes: 23 additions & 13 deletions src/api/impl/gramjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ export class GramJSApi implements ITDLibClient {
};
}

public async *downloadFile(
public async downloadFile(
req: types.DownloadFileReq,
): types.DownloadFileResp {
): Promise<types.DownloadFileResp> {
const message = (
await this.getMessages({
chatId: req.chatId,
Expand All @@ -250,17 +250,27 @@ export class GramJSApi implements ITDLibClient {
const chunkSize = req.chunkSize * 1024;

let i = 0;
for await (const chunk of this.client.iterDownload({
file: new Api.InputDocumentFileLocation({
id: message.document.id,
accessHash: message.document.accessHash,
fileReference: message.document.fileReference,
thumbSize: '',
}),
requestSize: chunkSize,
})) {
i += 1;
yield Buffer.from(chunk);

const client = this.client;

async function* chunks() {
for await (const chunk of client.iterDownload({
file: new Api.InputDocumentFileLocation({
id: message.document.id,
accessHash: message.document.accessHash,
fileReference: message.document.fileReference,
thumbSize: '',
}),
requestSize: chunkSize,
})) {
i += 1;
yield Buffer.from(chunk);
}
}

return {
chunks: chunks(),
size: message.document.size,
};
}
}
2 changes: 1 addition & 1 deletion src/api/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface ITDLibClient {

editMessageMedia(req: types.EditMessageMediaReq): Promise<types.Message>;

downloadFile(req: types.DownloadFileReq): types.DownloadFileResp;
downloadFile(req: types.DownloadFileReq): Promise<types.DownloadFileResp>;
}

export interface IBot {}
Expand Down
5 changes: 4 additions & 1 deletion src/api/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,7 @@ export type DownloadFileReq = Chat &
chunkSize: number;
};

export type DownloadFileResp = AsyncGenerator<Buffer>;
export type DownloadFileResp = {
chunks: AsyncGenerator<Buffer>;
size: bigInt.BigInteger;
};
25 changes: 25 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ export type Config = {
token: string;
chat_id: number;
};
jwt: {
secret: string;
algorithm: string;
life: number;
};
};
};

Expand Down Expand Up @@ -100,6 +105,11 @@ export const loadConfig = (configPath: string): Config => {
token: cfg['manager']['bot']['token'],
chat_id: cfg['manager']['bot']['chat_id'],
},
jwt: {
secret: cfg['manager']['jwt']['secret'],
algorithm: cfg['manager']['jwt']['algorithm'] ?? 'HS256',
life: cfg['manager']['jwt']['life'],
},
},
};
return config;
Expand Down Expand Up @@ -127,6 +137,16 @@ export const createConfig = async (): Promise<string> => {
{ default: path.join(process.cwd(), 'config.yaml') },
);

const generateRandomSecret = () => {
const chars =
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
let secret = '';
for (let i = 0; i < 64; i++) {
secret += chars.charAt(Math.floor(Math.random() * chars.length));
}
return secret;
};

const res: Config = {
telegram: {
api_id: Number(
Expand Down Expand Up @@ -189,6 +209,11 @@ export const createConfig = async (): Promise<string> => {
token: '',
chat_id: 0,
},
jwt: {
secret: generateRandomSecret(),
algorithm: 'HS256',
life: 3600 * 24 * 7,
},
},
};

Expand Down
42 changes: 42 additions & 0 deletions src/errors/authentication.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import HTTPErrors from 'http-errors';

import { BusinessError } from './base';

export class BadAuthentication extends BusinessError {
constructor(
public readonly message: string,
public readonly cause?: string,
) {
super(message, 'BAD_AUTHENTICATION', cause, HTTPErrors.Unauthorized);
}
}

export class MissingAuthenticationHeaders extends BadAuthentication {
constructor() {
super('Missing authentication headers', 'Missing authentication headers');
}
}

export class InvalidCredentials extends BadAuthentication {
constructor(public readonly cause: string) {
super('Bad authentication', cause);
}
}

export class UserNotFound extends InvalidCredentials {
constructor(public readonly username: string) {
super(`User ${username} not found`);
}
}

export class IncorrectPassword extends InvalidCredentials {
constructor(public readonly username: string) {
super(`Password for ${username} does not match`);
}
}

export class JWTInvalid extends InvalidCredentials {
constructor(public readonly cause: string) {
super(`JWT token invalid, ${cause}`);
}
}
Loading

0 comments on commit d2142d7

Please sign in to comment.