Skip to content

Commit

Permalink
added flow control, include file size in file descriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
TheodoreKrypton committed May 25, 2024
1 parent c3ff352 commit 0a17e51
Show file tree
Hide file tree
Showing 27 changed files with 308 additions and 144 deletions.
22 changes: 17 additions & 5 deletions src/api/client/directory-api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { DirectoryIsNotEmptyError } from 'src/errors/path';
import { TGFSDirectory } from 'src/model/directory';
import { FileOrDirectoryDoesNotExistError } from 'src/errors/path';
import { TGFSDirectory, TGFSFileRef } from 'src/model/directory';
import { validateName } from 'src/utils/validate-name';

import { MetaDataApi } from './metadata-api';
Expand All @@ -24,11 +25,22 @@ export class DirectoryApi extends MetaDataApi {
return newDirectory;
}

public ls(
dir: TGFSDirectory,
fileName?: string,
): TGFSFileRef | Array<TGFSDirectory | TGFSFileRef> {
if (fileName) {
const file = dir.findFile(fileName);
if (file) {
return file;
}
throw new FileOrDirectoryDoesNotExistError(fileName, 'list');
}
return [...dir.findDirs(), ...dir.findFiles()];
}

public async deleteEmptyDirectory(directory: TGFSDirectory) {
if (
directory.findChildren().length > 0 ||
directory.findFiles().length > 0
) {
if (directory.findDirs().length > 0 || directory.findFiles().length > 0) {
throw new DirectoryIsNotEmptyError();
}
await this.dangerouslyDeleteDirectory(directory);
Expand Down
78 changes: 46 additions & 32 deletions src/api/client/file-desc-api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MessageNotFound } from 'src/errors/telegram';
import { TGFSFileRef } from 'src/model/directory';
import { TGFSFile } from 'src/model/file';
import { TGFSFile, TGFSFileVersion } from 'src/model/file';
import { Logger } from 'src/utils/logger';

import { MessageApi } from './message-api';
Expand Down Expand Up @@ -34,22 +34,19 @@ export class FileDescApi extends MessageApi {
}

public async createFileDesc(fileMsg: GeneralFileMessage): Promise<number> {
const tgfsFile = new TGFSFile(fileMsg.name);
const fd = new TGFSFile(fileMsg.name);

if ('empty' in fileMsg) {
tgfsFile.addEmptyVersion();
fd.addEmptyVersion();
} else {
const id = await this.sendFile(fileMsg);
tgfsFile.addVersionFromFileMessageId(id);
const sentFileMsg = await this.sendFile(fileMsg);
fd.addVersionFromSentFileMessage(sentFileMsg);
}

return await this.sendFileDesc(tgfsFile);
return await this.sendFileDesc(fd);
}

public async getFileDesc(
fileRef: TGFSFileRef,
withVersionInfo: boolean = true,
): Promise<TGFSFile> {
public async getFileDesc(fileRef: TGFSFileRef): Promise<TGFSFile> {
const message = (await this.getMessages([fileRef.getMessageId()]))[0];

if (!message) {
Expand All @@ -63,25 +60,31 @@ export class FileDescApi extends MessageApi {

const fileDesc = TGFSFile.fromObject(JSON.parse(message.text));

if (withVersionInfo) {
const versions = Object.values(fileDesc.versions);
const versions = Object.values(fileDesc.versions);

const nonEmptyVersions = versions.filter(
(version) => version.messageId > 0,
); // may contain empty versions
const nonEmptyVersions = versions.filter(
(version) => version.messageId != TGFSFileVersion.EMPTY_FILE,
); // may contain empty versions

const fileMessages = await this.getMessages(
nonEmptyVersions.map((version) => version.messageId),
);
const versionsWithoutSizeInfo = nonEmptyVersions.filter(
(version) => version.size == TGFSFileVersion.INVALID_FILE_SIZE,
);

const fileMessages = await this.getMessages(
versionsWithoutSizeInfo.map((version) => version.messageId),
);

versionsWithoutSizeInfo.forEach((version, i) => {
const fileMessage = fileMessages[i];
if (fileMessage) {
version.size = Number(fileMessage.document.size);
} else {
version.setInvalid();
}
});

nonEmptyVersions.forEach((version, i) => {
const fileMessage = fileMessages[i];
if (fileMessage) {
version.size = Number(fileMessage.document.size);
} else {
version.setInvalid();
}
});
if (versionsWithoutSizeInfo.length > 0) {
await this.updateFileDesc(fileRef.getMessageId(), fileDesc);
}

return fileDesc;
Expand All @@ -91,17 +94,27 @@ export class FileDescApi extends MessageApi {
fr: TGFSFileRef,
fileMsg: GeneralFileMessage,
): Promise<number> {
const fd = await this.getFileDesc(fr, false);
const fd = await this.getFileDesc(fr);

if (isFileMessageEmpty(fileMsg)) {
fd.addEmptyVersion();
} else {
const messageId = await this.sendFile(fileMsg);
fd.addVersionFromFileMessageId(messageId);
const sentFileMsg = await this.sendFile(fileMsg);
fd.addVersionFromSentFileMessage(sentFileMsg);
}
return await this.sendFileDesc(fd, fr.getMessageId());
}

public async updateFileDesc(
messageId: number,
fileDesc: TGFSFile,
): Promise<number> {
return await this.editMessageText(
messageId,
JSON.stringify(fileDesc.toObject()),
);
}

public async updateFileVersion(
fr: TGFSFileRef,
fileMsg: GeneralFileMessage,
Expand All @@ -113,9 +126,10 @@ export class FileDescApi extends MessageApi {
fv.setInvalid();
fd.updateVersion(fv);
} else {
const messageId = await this.sendFile(fileMsg);
const sentFileMsg = await this.sendFile(fileMsg);
const fv = fd.getVersion(versionId);
fv.messageId = messageId;
fv.messageId = sentFileMsg.messageId;
fv.size = sentFileMsg.size.toJSNumber();
fd.updateVersion(fv);
}
return await this.sendFileDesc(fd, fr.getMessageId());
Expand All @@ -125,7 +139,7 @@ export class FileDescApi extends MessageApi {
fr: TGFSFileRef,
versionId: string,
): Promise<number> {
const fd = await this.getFileDesc(fr, false);
const fd = await this.getFileDesc(fr);
fd.deleteVersion(versionId);
return await this.sendFileDesc(fd, fr.getMessageId());
}
Expand Down
51 changes: 39 additions & 12 deletions src/api/client/message-api/api.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import { Hash, createHash } from 'crypto';
import fs from 'fs';

import { RPCError } from 'telegram/errors';

import bigInt from 'big-integer';

import { IBot, TDLibApi } from 'src/api/interface';
import { SentFileMessage } from 'src/api/types';
import { config } from 'src/config';
import { TechnicalError } from 'src/errors/base';
import { MessageNotFound } from 'src/errors/telegram';
import { TGFSFileVersion } from 'src/model/file';
import { manager } from 'src/server/manager';
import { flowControl } from 'src/utils/flow-control';
import { Logger } from 'src/utils/logger';

import { getUploader } from './file-uploader';
Expand All @@ -29,6 +33,7 @@ export class MessageApi extends MessageBroker {
super(tdlib);
}

@flowControl()
protected async sendText(message: string): Promise<number> {
return (
await this.tdlib.bot.sendText({
Expand All @@ -38,6 +43,7 @@ export class MessageApi extends MessageBroker {
).messageId;
}

@flowControl()
protected async editMessageText(
messageId: number,
message: string,
Expand All @@ -51,14 +57,19 @@ export class MessageApi extends MessageBroker {
})
).messageId;
} catch (err) {
if (err.message === 'message to edit not found') {
throw new MessageNotFound(messageId);
} else {
throw err;
if (err instanceof RPCError) {
if (err.message === 'message to edit not found') {
throw new MessageNotFound(messageId);
}
if (err.errorMessage === 'MESSAGE_NOT_MODIFIED') {
return messageId;
}
}
throw err;
}
}

@flowControl()
protected async editMessageMedia(
messageId: number,
buffer: Buffer,
Expand Down Expand Up @@ -128,7 +139,9 @@ export class MessageApi extends MessageBroker {
// Logger.info(`${(uploaded / totalSize) * 100}% uploaded`);
}

private async _sendFile(fileMsg: GeneralFileMessage): Promise<number> {
private async _sendFile(
fileMsg: GeneralFileMessage,
): Promise<SentFileMessage> {
let messageId = TGFSFileVersion.EMPTY_FILE;
const uploader = getUploader(this.tdlib, fileMsg, async () => {
messageId = (
Expand All @@ -138,13 +151,23 @@ export class MessageApi extends MessageBroker {
)
).messageId;
});
await uploader.upload(fileMsg, MessageApi.report, fileMsg.name);
const size = await uploader.upload(
fileMsg,
MessageApi.report,
fileMsg.name,
);

Logger.debug('File sent', JSON.stringify(fileMsg));
return messageId;

return {
messageId,
size,
};
}

protected async sendFile(fileMsg: GeneralFileMessage): Promise<number> {
protected async sendFile(
fileMsg: GeneralFileMessage,
): Promise<SentFileMessage> {
if ('stream' in fileMsg) {
// Unable to calculate sha256 for file as a stream. So just send it.
Logger.debug(
Expand All @@ -159,17 +182,21 @@ export class MessageApi extends MessageBroker {

fileMsg.tags = { sha256: fileHash };

const existingFile = await this.tdlib.account.searchMessages({
const existingFileMsg = await this.tdlib.account.searchMessages({
chatId: this.privateChannelId,
search: `#sha256IS${fileHash}`,
});

if (existingFile.length > 0) {
if (existingFileMsg.length > 0) {
const msg = existingFileMsg[0];
Logger.debug(
`Found file with the same sha256 ${fileHash}, skip uploading`,
JSON.stringify(existingFile[0]),
JSON.stringify(msg),
);
return existingFile[0].messageId;
return {
messageId: msg.messageId,
size: msg.document.size,
};
}

return await this._sendFile(fileMsg);
Expand Down
7 changes: 5 additions & 2 deletions src/api/client/message-api/file-uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import path from 'path';

import { ITDLibClient, TDLibApi } from 'src/api/interface';
import { SendMessageResp, UploadedFile } from 'src/api/types';
import { Queue, generateFileId, getAppropriatedPartSize } from 'src/api/utils';
import { generateFileId, getAppropriatedPartSize } from 'src/api/utils';
import { AggregatedError } from 'src/errors/base';
import { FileTooBig } from 'src/errors/telegram';
import { manager } from 'src/server/manager';
import { Logger } from 'src/utils/logger';
import { Queue } from 'src/utils/queue';

import {
FileMessageFromBuffer,
Expand Down Expand Up @@ -153,7 +154,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
totalSize: bigInt.BigInteger,
) => void,
fileName?: string,
): Promise<void> {
): Promise<bigInt.BigInteger> {
const task = manager.createUploadTask(this.fileName, this.fileSize);
this.prepare(file);
try {
Expand Down Expand Up @@ -200,6 +201,8 @@ export abstract class FileUploader<T extends GeneralFileMessage> {

task.errors = this.errors;
task.complete();

return this.fileSize;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/api/client/message-api/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Readable } from 'stream';

import { Message } from 'src/api/types';

export type FileTags = {
sha256: string;
};
Expand Down
20 changes: 18 additions & 2 deletions src/api/client/metadata-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { FileDescApi } from './file-desc-api';

export class MetaDataApi extends FileDescApi {
private metadata: TGFSMetadata;
private updateMetaDataTimeout: NodeJS.Timeout = null;

protected async initMetadata() {
this.metadata = await this.getMetadata();
Expand Down Expand Up @@ -43,7 +44,6 @@ export class MetaDataApi extends FileDescApi {

protected async syncMetadata() {
this.metadata.syncWith(await this.getMetadata());

await this.updateMetadata();
}

Expand All @@ -57,9 +57,25 @@ export class MetaDataApi extends FileDescApi {
'metadata.json',
'',
);
// return new Promise((resolve, reject) => {
// if (this.updateMetaDataTimeout) {
// clearTimeout(this.updateMetaDataTimeout);
// }
// this.updateMetaDataTimeout = setTimeout(async () => {
// try {

// resolve(undefined);
// } catch (err) {
// reject(err);
// }
// }, 1000);
// });
} else {
// doesn't exist, create new metadata and pin
const messageId = await this.sendFile({ buffer, name: 'metadata.json' });
const { messageId } = await this.sendFile({
buffer,
name: 'metadata.json',
});
this.metadata.msgId = messageId;
await this.pinMessage(messageId);
}
Expand Down
Loading

0 comments on commit 0a17e51

Please sign in to comment.