Skip to content

Commit

Permalink
refactor: 매수 로직 리팩토링
Browse files Browse the repository at this point in the history
for문-> promise로 교체 & 백그라운드 거래 체결 로직 동기적 -> 비동기적 으로 수정
위 과정을 진행하다가 커넥션 풀 문제가 생겨서 connectionLimit 10으로 수정
  • Loading branch information
SeungGwan123 committed Jan 21, 2025
1 parent c1e2677 commit 3c4037f
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 90 deletions.
3 changes: 2 additions & 1 deletion packages/server/src/account/account.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Module, forwardRef } from '@nestjs/common';
import { Logger, Module, forwardRef } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { Account } from './account.entity';
import { AccountRepository } from './account.repository';
Expand All @@ -18,6 +18,7 @@ import { CoinListService } from '@src/upbit/coin-list.service';
AssetRepository,
AssetService,
CoinListService,
Logger
],
})
export class AccountModule {}
24 changes: 13 additions & 11 deletions packages/server/src/account/account.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,20 @@ export class AccountRepository extends Repository<Account> {
}
}

async validateUserAccount(userId: number): Promise<Account> {
this.logger.log(`사용자 계정 검증 시작: userId=${userId}`);
const userAccount = await this.findOne({
where: { user: { id: userId } },
});

if (!userAccount) {
this.logger.warn(`존재하지 않는 사용자 계정: userId=${userId}`);
throw new UnprocessableEntityException('유저가 존재하지 않습니다.');
async validateUserAccount(userId: number, queryRunner): Promise<Account> {
try{
this.logger.log(`사용자 계정 검증 시작: userId=${userId}`);
const userAccount = await queryRunner.manager.findOne(Account, {
where: { user: { id: userId } },
});
if (!userAccount) {
this.logger.warn(`존재하지 않는 사용자 계정: userId=${userId}`);
throw new UnprocessableEntityException('유저가 존재하지 않습니다.');
}
return userAccount;
} catch (error) {
this.logger.error(`계좌 조회 실패: ${error.message}`, error.stack);
}

return userAccount;
}

async getAccount(id: number, queryRunner: QueryRunner): Promise<Account> {
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/asset/asset.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Module } from '@nestjs/common';
import { Logger, Module } from '@nestjs/common';
import { AssetService } from './asset.service';
import { AssetRepository } from './asset.repository';
import { TypeOrmModule } from '@nestjs/typeorm';
Expand All @@ -11,6 +11,7 @@ import { HttpModule } from '@nestjs/axios';
providers: [
AssetService,
AssetRepository,
Logger
],
exports: [AssetRepository],
})
Expand Down
28 changes: 26 additions & 2 deletions packages/server/src/auth/user.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,32 @@ export class UserRepository extends Repository<User> {
throw error;
}
}
async validateUser(userId: number): Promise<User> {
const user = await this.getUser(userId);

async getUserByQueryRunner(userId: number, queryRunner): Promise<User> {
try {
const user = await queryRunner.manager.findOne(User,{
where: { id: userId },
});

if (!user) {
this.logger.warn('User not found', { userId });
throw new UserNotFoundException(userId);
}

return user;
} catch (error) {
if (!(error instanceof UserNotFoundException)) {
this.logger.error('Failed to fetch user', {
userId,
error: error.stack,
});
}
throw error;
}
}

async validateUser(userId: number, queryRunner): Promise<User> {
const user = await this.getUserByQueryRunner(userId, queryRunner);

if (!user) {
throw new UserNotFoundException(userId);
Expand Down
4 changes: 4 additions & 0 deletions packages/server/src/configs/typeorm.config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { TypeOrmModuleOptions } from '@nestjs/typeorm';
import { connect } from 'http2';

export default function getTypeOrmConfig(): TypeOrmModuleOptions {
return {
Expand All @@ -11,5 +12,8 @@ export default function getTypeOrmConfig(): TypeOrmModuleOptions {
entities: [__dirname + '/../**/*.entity.{js,ts}'],
synchronize: process.env.DB_SYNCHRONIZE === 'true',
dropSchema: process.env.DB_DROPSCHEMA === 'true',
extra:{
connectionLimit: 10,
}
};
}
2 changes: 1 addition & 1 deletion packages/server/src/redis/redis.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { ChartRedisRepository } from './chart-redis.repository';
provide: 'TRADE_REDIS_CLIENT',
useFactory: () => {
const config = getRedisConfig();
const client = new Redis({ ...config, db: 1 });
const client = new Redis({ ...config, db: 4 });
const logger = new Logger('TRADE_REDIS_CLIENT');
client.on('connect', () => logger.log('트레이드용 Redis 연결 성공'));
client.on('error', (error) =>
Expand Down
38 changes: 22 additions & 16 deletions packages/server/src/trade/trade-ask-bid.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,31 @@ export class TradeAskBidService {
try {
const coinLatestInfo = this.coinDataUpdaterService.getCoinLatestInfo();
if (coinLatestInfo.size === 0) return;

const coinPrices = this.buildCoinPrices(coinLatestInfo);

const availableTrades = await this.redisRepository.findMatchingTrades(
tradeType,
coinPrices,
);

for (const trade of availableTrades) {
try{
const tradeDto = this.buildTradeDto(trade, coinLatestInfo, tradeType);
this.logger.debug(`처리 중인 거래: tradeId=${tradeDto.tradeId}`);
await handler(tradeDto);
} catch (err) {
this.logger.error(
`미체결 거래 처리 중 오류 발생: trade=${JSON.stringify(trade)}, error=${err.message}`,
err.stack,
);
}
}

// 병렬 처리로 모든 거래를 처리
await Promise.all(
availableTrades.map(async (trade) => {
try {
const tradeDto = this.buildTradeDto(trade, coinLatestInfo, tradeType);
this.logger.debug(`처리 중인 거래: tradeId=${tradeDto.tradeId}`);
await handler(tradeDto);
} catch (err) {
this.logger.error(
`미체결 거래 처리 중 오류 발생: trade=${JSON.stringify(
trade,
)}, error=${err.message}`,
err.stack,
);
}
}),
);
} catch (error) {
this.logger.error(
`미체결 거래 처리 전반적 오류: tradeType=${tradeType}, error=${error.message}`,
Expand All @@ -60,6 +65,7 @@ export class TradeAskBidService {
this.logger.log(`${tradeType} 미체결 거래 처리 완료`);
}
}

private buildCoinPrices(coinLatestInfo: Map<string, any>): CoinPriceDto[] {
const prices: CoinPriceDto[] = [];
coinLatestInfo.forEach((value, key) => {
Expand Down Expand Up @@ -126,7 +132,7 @@ export class TradeAskBidService {
tradeData.quantity = formatQuantity(tradeData.quantity - buyData.quantity);
if (isMinimumQuantity(tradeData.quantity)) {
await this.tradeRepository.deleteTrade(tradeData.tradeId, queryRunner);
await queryRunner.commitTransaction();

await this.redisRepository.deleteTrade(tradeData);
} else {
await this.tradeRepository.updateTradeQuantity(tradeData, queryRunner);
Expand Down
27 changes: 5 additions & 22 deletions packages/server/src/trade/trade-ask.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { TradeAskBidService } from './trade-ask-bid.service';
@Injectable()
export class AskService extends TradeAskBidService implements OnModuleInit {
private isProcessing: { [key: number]: boolean } = {};
private transactionCreateAsk: boolean = false;

onModuleInit() {
this.startPendingTradesProcessor();
Expand Down Expand Up @@ -68,11 +67,6 @@ export class AskService extends TradeAskBidService implements OnModuleInit {
throw new BadRequestException('최소 거래 금액보다 작습니다.');
}

if (this.transactionCreateAsk) {
await this.waitForTransaction(() => this.transactionCreateAsk);
}
this.transactionCreateAsk = true;

try {
let userTrade;
const transactionResult = await this.executeTransaction(
Expand All @@ -82,8 +76,9 @@ export class AskService extends TradeAskBidService implements OnModuleInit {
}

const userAccount = await this.accountRepository.validateUserAccount(
user.userId,
user.userId, queryRunner
);

const userAsset = await this.checkAssetAvailability(
askDto,
userAccount,
Expand Down Expand Up @@ -127,8 +122,8 @@ export class AskService extends TradeAskBidService implements OnModuleInit {
await this.redisRepository.createTrade(tradeData);
}
return transactionResult;
} finally {
this.transactionCreateAsk = false;
} catch (error) {
console.error(`거래 생성 실패`, { error: error.stack, userId: user.userId });
}
}

Expand Down Expand Up @@ -239,7 +234,7 @@ export class AskService extends TradeAskBidService implements OnModuleInit {
}

buyData.price = formatQuantity(bid_price * krw);
const user = await this.userRepository.getUser(userId);
const user = await this.userRepository.getUserByQueryRunner(userId, queryRunner);

const assetName = buyData.assetName;
buyData.assetName = buyData.tradeCurrency;
Expand Down Expand Up @@ -310,16 +305,4 @@ export class AskService extends TradeAskBidService implements OnModuleInit {
queryRunner,
);
}

private async waitForTransaction(
checkCondition: () => boolean,
): Promise<void> {
return new Promise<void>((resolve) => {
const check = () => {
if (!checkCondition()) resolve();
else setTimeout(check, TRANSACTION_CHECK_INTERVAL);
};
check();
});
}
}
43 changes: 10 additions & 33 deletions packages/server/src/trade/trade-bid.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
BadRequestException,
Injectable,
OnApplicationBootstrap,
OnModuleInit,
UnprocessableEntityException,
} from '@nestjs/common';
Expand All @@ -19,15 +20,16 @@ import {
import { UPBIT_UPDATED_COIN_INFO_TIME } from '../upbit/constants';
import { TradeNotFoundException } from './exceptions/trade.exceptions';
import { TradeAskBidService } from './trade-ask-bid.service';
import { isMainThread, Worker } from 'worker_threads';
import { query } from 'express';

@Injectable()
export class BidService extends TradeAskBidService implements OnModuleInit {
private transactionCreateBid: boolean = false;
private isProcessing: { [key: number]: boolean } = {};

onModuleInit() {
onModuleInit() {
this.startPendingTradesProcessor();
}
}

private startPendingTradesProcessor() {
const processBidTrades = async () => {
Expand Down Expand Up @@ -60,26 +62,17 @@ export class BidService extends TradeAskBidService implements OnModuleInit {
if (isMinimumQuantity(bidDto.receivedAmount * bidDto.receivedPrice)) {
throw new BadRequestException('최소 거래 금액보다 작습니다.');
}

if (this.transactionCreateBid) {
await this.waitForTransaction(() => this.transactionCreateBid);
}
this.transactionCreateBid = true;

try {
let userTrade;
const transactionResult = await this.executeTransaction(
async (queryRunner) => {
if (bidDto.receivedAmount <= 0) {
throw new BadRequestException('수량은 0보다 커야 합니다.');
}

const userAccount = await this.accountRepository.validateUserAccount(
user.userId,
user.userId, queryRunner
);

await this.checkCurrencyBalance(bidDto, userAccount);

const { receivedPrice, receivedAmount } = bidDto;

await this.accountRepository.updateAccountCurrency(
Expand All @@ -88,7 +81,6 @@ export class BidService extends TradeAskBidService implements OnModuleInit {
userAccount.id,
queryRunner,
);

userTrade = await this.tradeRepository.createTrade(
bidDto,
user.userId,
Expand All @@ -112,12 +104,11 @@ export class BidService extends TradeAskBidService implements OnModuleInit {
quantity: bidDto.receivedAmount,
createdAt: userTrade.createdAt,
};

await this.redisRepository.createTrade(tradeData);
}
return transactionResult;
} finally {
this.transactionCreateBid = false;
}catch(error){
console.log(error);
}
}

Expand Down Expand Up @@ -156,9 +147,7 @@ export class BidService extends TradeAskBidService implements OnModuleInit {
if (order.ask_price > bidDto.receivedPrice) break;
const tradeResult = await this.executeTransaction(
async (queryRunner) => {
const account = await this.accountRepository.findOne({
where: { user: { id: userId } },
});
const account = await this.accountRepository.getAccount(userId, queryRunner);

bidDto.accountBalance = account[typeGiven];
bidDto.account = account;
Expand Down Expand Up @@ -211,7 +200,7 @@ export class BidService extends TradeAskBidService implements OnModuleInit {
}

buyData.price = formatQuantity(ask_price * krw);
const user = await this.userRepository.getUser(userId);
const user = await this.userRepository.getUserByQueryRunner(userId,queryRunner);

await this.tradeHistoryRepository.createTradeHistory(
user,
Expand Down Expand Up @@ -297,16 +286,4 @@ export class BidService extends TradeAskBidService implements OnModuleInit {
queryRunner,
);
}

private async waitForTransaction(
checkCondition: () => boolean,
): Promise<void> {
return new Promise<void>((resolve) => {
const check = () => {
if (!checkCondition()) resolve();
else setTimeout(check, TRANSACTION_CHECK_INTERVAL);
};
check();
});
}
}
1 change: 1 addition & 0 deletions packages/server/src/trade/trade.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { TradeService } from './trade.service';
import { TradeData } from './dtos/trade.interface';
import { TradeAskDto, TradeDto } from './dtos/trade.dto';
import { TRADE_TYPES } from './constants/trade.constants';
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';

@ApiTags('Trade')
@ApiBearerAuth('access-token')
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/trade/trade.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Module } from '@nestjs/common';
import { Logger, Module } from '@nestjs/common';
import { TradeController } from './trade.controller';
import { TypeOrmModule } from '@nestjs/typeorm';
import { Trade } from './trade.entity';
Expand Down Expand Up @@ -26,6 +26,7 @@ import { TradeAskBidService } from './trade-ask-bid.service';
TradeHistoryRepository,
TradeService,
TradeAskBidService,
Logger
],
controllers: [TradeController],
exports: [TradeRepository]
Expand Down
Loading

0 comments on commit 3c4037f

Please sign in to comment.