diff --git a/packages/server/src/account/account.module.ts b/packages/server/src/account/account.module.ts index f23fbaa2..d2d02d53 100644 --- a/packages/server/src/account/account.module.ts +++ b/packages/server/src/account/account.module.ts @@ -1,4 +1,4 @@ -import { Module, forwardRef } from '@nestjs/common'; +import { Logger, Module, forwardRef } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { Account } from './account.entity'; import { AccountRepository } from './account.repository'; @@ -18,6 +18,7 @@ import { CoinListService } from '@src/upbit/coin-list.service'; AssetRepository, AssetService, CoinListService, + Logger ], }) export class AccountModule {} diff --git a/packages/server/src/account/account.repository.ts b/packages/server/src/account/account.repository.ts index 22867e45..452585fc 100644 --- a/packages/server/src/account/account.repository.ts +++ b/packages/server/src/account/account.repository.ts @@ -157,18 +157,20 @@ export class AccountRepository extends Repository { } } - async validateUserAccount(userId: number): Promise { - this.logger.log(`사용자 계정 검증 시작: userId=${userId}`); - const userAccount = await this.findOne({ - where: { user: { id: userId } }, - }); - - if (!userAccount) { - this.logger.warn(`존재하지 않는 사용자 계정: userId=${userId}`); - throw new UnprocessableEntityException('유저가 존재하지 않습니다.'); + async validateUserAccount(userId: number, queryRunner): Promise { + try{ + this.logger.log(`사용자 계정 검증 시작: userId=${userId}`); + const userAccount = await queryRunner.manager.findOne(Account, { + where: { user: { id: userId } }, + }); + if (!userAccount) { + this.logger.warn(`존재하지 않는 사용자 계정: userId=${userId}`); + throw new UnprocessableEntityException('유저가 존재하지 않습니다.'); + } + return userAccount; + } catch (error) { + this.logger.error(`계좌 조회 실패: ${error.message}`, error.stack); } - - return userAccount; } async getAccount(id: number, queryRunner: QueryRunner): Promise { diff --git a/packages/server/src/asset/asset.module.ts b/packages/server/src/asset/asset.module.ts index 40474a17..12479dda 100644 --- a/packages/server/src/asset/asset.module.ts +++ b/packages/server/src/asset/asset.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { Logger, Module } from '@nestjs/common'; import { AssetService } from './asset.service'; import { AssetRepository } from './asset.repository'; import { TypeOrmModule } from '@nestjs/typeorm'; @@ -11,6 +11,7 @@ import { HttpModule } from '@nestjs/axios'; providers: [ AssetService, AssetRepository, + Logger ], exports: [AssetRepository], }) diff --git a/packages/server/src/auth/user.repository.ts b/packages/server/src/auth/user.repository.ts index 8a6c2c1c..31ebabf1 100644 --- a/packages/server/src/auth/user.repository.ts +++ b/packages/server/src/auth/user.repository.ts @@ -32,8 +32,32 @@ export class UserRepository extends Repository { throw error; } } - async validateUser(userId: number): Promise { - const user = await this.getUser(userId); + + async getUserByQueryRunner(userId: number, queryRunner): Promise { + try { + const user = await queryRunner.manager.findOne(User,{ + where: { id: userId }, + }); + + if (!user) { + this.logger.warn('User not found', { userId }); + throw new UserNotFoundException(userId); + } + + return user; + } catch (error) { + if (!(error instanceof UserNotFoundException)) { + this.logger.error('Failed to fetch user', { + userId, + error: error.stack, + }); + } + throw error; + } + } + + async validateUser(userId: number, queryRunner): Promise { + const user = await this.getUserByQueryRunner(userId, queryRunner); if (!user) { throw new UserNotFoundException(userId); diff --git a/packages/server/src/configs/typeorm.config.ts b/packages/server/src/configs/typeorm.config.ts index 591a596a..490dc2e3 100644 --- a/packages/server/src/configs/typeorm.config.ts +++ b/packages/server/src/configs/typeorm.config.ts @@ -1,4 +1,5 @@ import { TypeOrmModuleOptions } from '@nestjs/typeorm'; +import { connect } from 'http2'; export default function getTypeOrmConfig(): TypeOrmModuleOptions { return { @@ -11,5 +12,8 @@ export default function getTypeOrmConfig(): TypeOrmModuleOptions { entities: [__dirname + '/../**/*.entity.{js,ts}'], synchronize: process.env.DB_SYNCHRONIZE === 'true', dropSchema: process.env.DB_DROPSCHEMA === 'true', + extra:{ + connectionLimit: 10, + } }; } diff --git a/packages/server/src/redis/redis.module.ts b/packages/server/src/redis/redis.module.ts index 43f81157..1640abb1 100644 --- a/packages/server/src/redis/redis.module.ts +++ b/packages/server/src/redis/redis.module.ts @@ -12,7 +12,7 @@ import { ChartRedisRepository } from './chart-redis.repository'; provide: 'TRADE_REDIS_CLIENT', useFactory: () => { const config = getRedisConfig(); - const client = new Redis({ ...config, db: 1 }); + const client = new Redis({ ...config, db: 4 }); const logger = new Logger('TRADE_REDIS_CLIENT'); client.on('connect', () => logger.log('트레이드용 Redis 연결 성공')); client.on('error', (error) => diff --git a/packages/server/src/trade/trade-ask-bid.service.ts b/packages/server/src/trade/trade-ask-bid.service.ts index db787635..1bd44a77 100644 --- a/packages/server/src/trade/trade-ask-bid.service.ts +++ b/packages/server/src/trade/trade-ask-bid.service.ts @@ -31,26 +31,31 @@ export class TradeAskBidService { try { const coinLatestInfo = this.coinDataUpdaterService.getCoinLatestInfo(); if (coinLatestInfo.size === 0) return; - + const coinPrices = this.buildCoinPrices(coinLatestInfo); - + const availableTrades = await this.redisRepository.findMatchingTrades( tradeType, coinPrices, ); - - for (const trade of availableTrades) { - try{ - const tradeDto = this.buildTradeDto(trade, coinLatestInfo, tradeType); - this.logger.debug(`처리 중인 거래: tradeId=${tradeDto.tradeId}`); - await handler(tradeDto); - } catch (err) { - this.logger.error( - `미체결 거래 처리 중 오류 발생: trade=${JSON.stringify(trade)}, error=${err.message}`, - err.stack, - ); - } - } + + // 병렬 처리로 모든 거래를 처리 + await Promise.all( + availableTrades.map(async (trade) => { + try { + const tradeDto = this.buildTradeDto(trade, coinLatestInfo, tradeType); + this.logger.debug(`처리 중인 거래: tradeId=${tradeDto.tradeId}`); + await handler(tradeDto); + } catch (err) { + this.logger.error( + `미체결 거래 처리 중 오류 발생: trade=${JSON.stringify( + trade, + )}, error=${err.message}`, + err.stack, + ); + } + }), + ); } catch (error) { this.logger.error( `미체결 거래 처리 전반적 오류: tradeType=${tradeType}, error=${error.message}`, @@ -60,6 +65,7 @@ export class TradeAskBidService { this.logger.log(`${tradeType} 미체결 거래 처리 완료`); } } + private buildCoinPrices(coinLatestInfo: Map): CoinPriceDto[] { const prices: CoinPriceDto[] = []; coinLatestInfo.forEach((value, key) => { @@ -126,7 +132,7 @@ export class TradeAskBidService { tradeData.quantity = formatQuantity(tradeData.quantity - buyData.quantity); if (isMinimumQuantity(tradeData.quantity)) { await this.tradeRepository.deleteTrade(tradeData.tradeId, queryRunner); - await queryRunner.commitTransaction(); + await this.redisRepository.deleteTrade(tradeData); } else { await this.tradeRepository.updateTradeQuantity(tradeData, queryRunner); diff --git a/packages/server/src/trade/trade-ask.service.ts b/packages/server/src/trade/trade-ask.service.ts index e127dac9..5773caf4 100644 --- a/packages/server/src/trade/trade-ask.service.ts +++ b/packages/server/src/trade/trade-ask.service.ts @@ -23,7 +23,6 @@ import { TradeAskBidService } from './trade-ask-bid.service'; @Injectable() export class AskService extends TradeAskBidService implements OnModuleInit { private isProcessing: { [key: number]: boolean } = {}; - private transactionCreateAsk: boolean = false; onModuleInit() { this.startPendingTradesProcessor(); @@ -68,11 +67,6 @@ export class AskService extends TradeAskBidService implements OnModuleInit { throw new BadRequestException('최소 거래 금액보다 작습니다.'); } - if (this.transactionCreateAsk) { - await this.waitForTransaction(() => this.transactionCreateAsk); - } - this.transactionCreateAsk = true; - try { let userTrade; const transactionResult = await this.executeTransaction( @@ -82,8 +76,9 @@ export class AskService extends TradeAskBidService implements OnModuleInit { } const userAccount = await this.accountRepository.validateUserAccount( - user.userId, + user.userId, queryRunner ); + const userAsset = await this.checkAssetAvailability( askDto, userAccount, @@ -127,8 +122,8 @@ export class AskService extends TradeAskBidService implements OnModuleInit { await this.redisRepository.createTrade(tradeData); } return transactionResult; - } finally { - this.transactionCreateAsk = false; + } catch (error) { + console.error(`거래 생성 실패`, { error: error.stack, userId: user.userId }); } } @@ -239,7 +234,7 @@ export class AskService extends TradeAskBidService implements OnModuleInit { } buyData.price = formatQuantity(bid_price * krw); - const user = await this.userRepository.getUser(userId); + const user = await this.userRepository.getUserByQueryRunner(userId, queryRunner); const assetName = buyData.assetName; buyData.assetName = buyData.tradeCurrency; @@ -310,16 +305,4 @@ export class AskService extends TradeAskBidService implements OnModuleInit { queryRunner, ); } - - private async waitForTransaction( - checkCondition: () => boolean, - ): Promise { - return new Promise((resolve) => { - const check = () => { - if (!checkCondition()) resolve(); - else setTimeout(check, TRANSACTION_CHECK_INTERVAL); - }; - check(); - }); - } } diff --git a/packages/server/src/trade/trade-bid.service.ts b/packages/server/src/trade/trade-bid.service.ts index acde1b54..7722ad88 100644 --- a/packages/server/src/trade/trade-bid.service.ts +++ b/packages/server/src/trade/trade-bid.service.ts @@ -1,6 +1,7 @@ import { BadRequestException, Injectable, + OnApplicationBootstrap, OnModuleInit, UnprocessableEntityException, } from '@nestjs/common'; @@ -19,15 +20,16 @@ import { import { UPBIT_UPDATED_COIN_INFO_TIME } from '../upbit/constants'; import { TradeNotFoundException } from './exceptions/trade.exceptions'; import { TradeAskBidService } from './trade-ask-bid.service'; +import { isMainThread, Worker } from 'worker_threads'; +import { query } from 'express'; @Injectable() export class BidService extends TradeAskBidService implements OnModuleInit { - private transactionCreateBid: boolean = false; private isProcessing: { [key: number]: boolean } = {}; - onModuleInit() { + onModuleInit() { this.startPendingTradesProcessor(); - } + } private startPendingTradesProcessor() { const processBidTrades = async () => { @@ -60,12 +62,6 @@ export class BidService extends TradeAskBidService implements OnModuleInit { if (isMinimumQuantity(bidDto.receivedAmount * bidDto.receivedPrice)) { throw new BadRequestException('최소 거래 금액보다 작습니다.'); } - - if (this.transactionCreateBid) { - await this.waitForTransaction(() => this.transactionCreateBid); - } - this.transactionCreateBid = true; - try { let userTrade; const transactionResult = await this.executeTransaction( @@ -73,13 +69,10 @@ export class BidService extends TradeAskBidService implements OnModuleInit { if (bidDto.receivedAmount <= 0) { throw new BadRequestException('수량은 0보다 커야 합니다.'); } - const userAccount = await this.accountRepository.validateUserAccount( - user.userId, + user.userId, queryRunner ); - await this.checkCurrencyBalance(bidDto, userAccount); - const { receivedPrice, receivedAmount } = bidDto; await this.accountRepository.updateAccountCurrency( @@ -88,7 +81,6 @@ export class BidService extends TradeAskBidService implements OnModuleInit { userAccount.id, queryRunner, ); - userTrade = await this.tradeRepository.createTrade( bidDto, user.userId, @@ -112,12 +104,11 @@ export class BidService extends TradeAskBidService implements OnModuleInit { quantity: bidDto.receivedAmount, createdAt: userTrade.createdAt, }; - await this.redisRepository.createTrade(tradeData); } return transactionResult; - } finally { - this.transactionCreateBid = false; + }catch(error){ + console.log(error); } } @@ -156,9 +147,7 @@ export class BidService extends TradeAskBidService implements OnModuleInit { if (order.ask_price > bidDto.receivedPrice) break; const tradeResult = await this.executeTransaction( async (queryRunner) => { - const account = await this.accountRepository.findOne({ - where: { user: { id: userId } }, - }); + const account = await this.accountRepository.getAccount(userId, queryRunner); bidDto.accountBalance = account[typeGiven]; bidDto.account = account; @@ -211,7 +200,7 @@ export class BidService extends TradeAskBidService implements OnModuleInit { } buyData.price = formatQuantity(ask_price * krw); - const user = await this.userRepository.getUser(userId); + const user = await this.userRepository.getUserByQueryRunner(userId,queryRunner); await this.tradeHistoryRepository.createTradeHistory( user, @@ -297,16 +286,4 @@ export class BidService extends TradeAskBidService implements OnModuleInit { queryRunner, ); } - - private async waitForTransaction( - checkCondition: () => boolean, - ): Promise { - return new Promise((resolve) => { - const check = () => { - if (!checkCondition()) resolve(); - else setTimeout(check, TRANSACTION_CHECK_INTERVAL); - }; - check(); - }); - } } diff --git a/packages/server/src/trade/trade.controller.ts b/packages/server/src/trade/trade.controller.ts index e0ff4be4..5c04108f 100644 --- a/packages/server/src/trade/trade.controller.ts +++ b/packages/server/src/trade/trade.controller.ts @@ -27,6 +27,7 @@ import { TradeService } from './trade.service'; import { TradeData } from './dtos/trade.interface'; import { TradeAskDto, TradeDto } from './dtos/trade.dto'; import { TRADE_TYPES } from './constants/trade.constants'; +import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'; @ApiTags('Trade') @ApiBearerAuth('access-token') diff --git a/packages/server/src/trade/trade.module.ts b/packages/server/src/trade/trade.module.ts index bbe68cb3..7e4f8a8d 100644 --- a/packages/server/src/trade/trade.module.ts +++ b/packages/server/src/trade/trade.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { Logger, Module } from '@nestjs/common'; import { TradeController } from './trade.controller'; import { TypeOrmModule } from '@nestjs/typeorm'; import { Trade } from './trade.entity'; @@ -26,6 +26,7 @@ import { TradeAskBidService } from './trade-ask-bid.service'; TradeHistoryRepository, TradeService, TradeAskBidService, + Logger ], controllers: [TradeController], exports: [TradeRepository] diff --git a/packages/server/src/trade/trade.repository.ts b/packages/server/src/trade/trade.repository.ts index e5e699e1..839e069a 100644 --- a/packages/server/src/trade/trade.repository.ts +++ b/packages/server/src/trade/trade.repository.ts @@ -35,9 +35,9 @@ export class TradeRepository extends Repository { queryRunner: QueryRunner, ): Promise { try { - const user = await this.userRepository.getUser(userId); + const user = await this.userRepository.getUserByQueryRunner(userId,queryRunner); - this.userRepository.validateUser(userId); + this.userRepository.validateUser(userId,queryRunner); const trade = this.createTradeEntity(tradeDto, user, tradeType); const savedTrade = await queryRunner.manager.save(Trade, trade);