diff --git a/apps/api/.env.example b/apps/api/.env.example index dadc06b5..e06a2520 100644 --- a/apps/api/.env.example +++ b/apps/api/.env.example @@ -61,4 +61,8 @@ RABBITMQ_RECORD_SYNC_QUEUE= # Firebase FIREBASE_PROJECT_ID= FIREBASE_CLIENT_EMAIL= -FIREBASE_PRIVATE_KEY= \ No newline at end of file +FIREBASE_PRIVATE_KEY= + +# Grafana +GRAFANA_ADMIN_USER= +GRAFANA_ADMIN_PASSWORD= \ No newline at end of file diff --git a/apps/api/docker-compose-dev.yml b/apps/api/docker-compose-dev.yml index 2e286855..734c5fea 100644 --- a/apps/api/docker-compose-dev.yml +++ b/apps/api/docker-compose-dev.yml @@ -63,10 +63,48 @@ services: networks: - locus-network + prometheus: + image: prom/prometheus:latest + container_name: locus-prometheus + ports: + - '9090:9090' + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + - prometheus_data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/usr/share/prometheus/console_libraries' + - '--web.console.templates=/usr/share/prometheus/consoles' + restart: unless-stopped + networks: + - locus-network + + # Grafana 대시보드 + grafana: + image: grafana/grafana:latest + container_name: locus-grafana + ports: + - '3001:3000' + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin123 + # - GF_USERS_ALLOW_SIGN_UP=false + volumes: + - grafana_data:/var/lib/grafana + - ./grafana/provisioning:/etc/grafana/provisioning + restart: unless-stopped + networks: + - locus-network + depends_on: + - prometheus + volumes: elasticsearch_data: postgres_data: redis_data: + prometheus_data: + grafana_data: networks: locus-network: diff --git a/apps/api/package.json b/apps/api/package.json index 84131a78..c9cad889 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -41,6 +41,7 @@ "@nestjs/swagger": "^11.2.4", "@prisma/adapter-pg": "^7.2.0", "@prisma/client": "^7.2.0", + "@willsoto/nestjs-prometheus": "^6.0.2", "amqp-connection-manager": "^5.0.0", "amqplib": "^0.10.9", "axios": "^1.13.4", @@ -57,6 +58,7 @@ "passport-kakao": "^1.0.1", "passport-oauth2": "^1.8.0", "pg": "^8.16.3", + "prom-client": "^15.1.3", "redis": "^5.10.0", "reflect-metadata": "^0.2.2", "rxjs": "^7.8.1", diff --git a/apps/api/prometheus.yml b/apps/api/prometheus.yml new file mode 100644 index 00000000..15ed4ea9 --- /dev/null +++ b/apps/api/prometheus.yml @@ -0,0 +1,18 @@ +global: + scrape_interval: 15s # 15초마다 메트릭 수집 + evaluation_interval: 15s + +scrape_configs: + # NestJS 애플리케이션 + - job_name: 'nestjs-app' + static_configs: + - targets: ['host.docker.internal:3000'] # Docker 내부에서 호스트 접근 + labels: + service: 'nestjs' + env: 'development' + metrics_path: '/api/metrics' # GET /api/metrics 엔드포인트 + + # Prometheus 자체 모니터링 + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index d9a0efc7..7ff5ca23 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -19,6 +19,7 @@ import { TagsModule } from './tags/tags.module'; import { MapsModule } from './maps/maps.module'; import { NotificationModule } from './notification/notification.module'; import { ImagesModule } from './images/images.module'; +import { PrometheusModule } from './infra/monitoring/prometheus.module'; import { DuckModule } from './duck/duck.module'; @Module({ @@ -26,6 +27,7 @@ import { DuckModule } from './duck/duck.module'; ScheduleModule.forRoot(), ConfigModule.forRoot({ isGlobal: true }), PrismaModule, + PrometheusModule, RabbitMqModule, OutboxModule, RedisModule, diff --git a/apps/api/src/common/interceptors/response-tranform.interceptor.ts b/apps/api/src/common/interceptors/response-tranform.interceptor.ts index a8b4c630..38e98444 100644 --- a/apps/api/src/common/interceptors/response-tranform.interceptor.ts +++ b/apps/api/src/common/interceptors/response-tranform.interceptor.ts @@ -7,6 +7,7 @@ import { import { map, Observable } from 'rxjs'; import { ApiResponseType } from '../type/api-response.types'; import { ApiResponse } from '../utils/api-response.helper'; +import { Request } from 'express'; @Injectable() export class ResponseTransformInterceptor @@ -16,6 +17,10 @@ export class ResponseTransformInterceptor context: ExecutionContext, next: CallHandler, ): Observable> { + const request = context.switchToHttp().getRequest(); + if (request.url.includes('/metrics')) { + return next.handle() as Observable>; + } return next.handle().pipe(map((data: T) => ApiResponse.success(data))); } } diff --git a/apps/api/src/infra/monitoring/constants/metrics.constants.ts b/apps/api/src/infra/monitoring/constants/metrics.constants.ts new file mode 100644 index 00000000..6b5f469f --- /dev/null +++ b/apps/api/src/infra/monitoring/constants/metrics.constants.ts @@ -0,0 +1,24 @@ +export const API_METRICS = { + HTTP_REQUESTS_TOTAL: 'http_requests_total', + HTTP_REQUEST_DURATION_SEC: 'http_request_duration_seconds', +}; + +export const OUTBOX_METRICS = { + OUTBOX_EVENTS_PUBLISHED_TOTAL: 'outbox_events_published_total', + STATUS_TRANSITIONS_TOTAL: 'outbox_status_transitions_total', + OUTBOX_PROCESSING_DURATION_SEC: 'outbox_processing_duration_seconds', + OUTBOX_DEAD_LETTER_TOTAL: 'outbox_dead_letter_total', +}; + +export const RABBITMQ_METRICS = { + MESSAGES_PUBLISHED_TOTAL: 'messages_published_total', + MESSAGES_CONSUMED_TOTAL: 'messages_consumed_total', + MESSAGES_PROCESSING_DURATION_SEC: 'message_processing_duration_seconds', + MESSAGES_IN_FLIGHT: 'messages_in_flight', +}; + +export const ELASTICSEARCH_METRICS = { + OPERATIONS_TOTAL: 'elasticsearch_operations_total', + OPERATION_DURATION_SEC: 'elasticsearch_operation_duration_seconds', + RECORD_SYNC_EVENTS_TOTAL: 'record_sync_events_total', +}; diff --git a/apps/api/src/infra/monitoring/interceptor/api-metrics.interceptor.ts b/apps/api/src/infra/monitoring/interceptor/api-metrics.interceptor.ts new file mode 100644 index 00000000..59ca0b90 --- /dev/null +++ b/apps/api/src/infra/monitoring/interceptor/api-metrics.interceptor.ts @@ -0,0 +1,61 @@ +import { + Injectable, + NestInterceptor, + ExecutionContext, + CallHandler, +} from '@nestjs/common'; +import { Observable } from 'rxjs'; +import { tap } from 'rxjs/operators'; +import { ApiMetricsService } from '../services/api-metrics.service'; +import { Request, Response } from 'express'; + +@Injectable() +export class ApiMetricsInterceptor implements NestInterceptor { + private readonly EXCLUDE_PATHS = ['/api/metrics']; + + constructor(private readonly apiMetricsService: ApiMetricsService) {} + + intercept(context: ExecutionContext, next: CallHandler): Observable { + const request = context.switchToHttp().getRequest(); + const method: string = request.method; + const url: string = request.url; + const route = request.route as { path: string } | undefined; + const path: string = route?.path ?? url; + + if (this.EXCLUDE_PATHS.some((excludePath) => path.includes(excludePath))) { + return next.handle(); + } + + const startTime = Date.now(); + + return next.handle().pipe( + tap({ + next: () => { + const response = context.switchToHttp().getResponse(); + const statusCode = response.statusCode; + const duration = (Date.now() - startTime) / 1000; + this.recordApiMetrics(method, path, statusCode, duration); + }, + error: (error: { status?: number; statusCode?: number }) => { + const statusCode = error.status ?? error.statusCode ?? 500; + const duration = (Date.now() - startTime) / 1000; + this.recordApiMetrics(method, path, statusCode, duration); + }, + }), + ); + } + private recordApiMetrics( + method: string, + path: string, + status: number, + duration: number, + ) { + this.apiMetricsService.recordRequest(method, path, status); + this.apiMetricsService.recordAPIDurationTime( + method, + path, + status, + duration, + ); + } +} diff --git a/apps/api/src/infra/monitoring/metrics.provider.ts b/apps/api/src/infra/monitoring/metrics.provider.ts new file mode 100644 index 00000000..49902e2a --- /dev/null +++ b/apps/api/src/infra/monitoring/metrics.provider.ts @@ -0,0 +1,110 @@ +import { + makeCounterProvider, + makeGaugeProvider, + makeHistogramProvider, +} from '@willsoto/nestjs-prometheus'; +import { + API_METRICS, + ELASTICSEARCH_METRICS, + OUTBOX_METRICS, + RABBITMQ_METRICS, +} from './constants/metrics.constants'; + +export const metricsProviders = [ + // HTTP 요청 총 개수 + makeCounterProvider({ + name: API_METRICS.HTTP_REQUESTS_TOTAL, + help: 'Total number of HTTP requests', + labelNames: ['method', 'path', 'status'], + }), + + // HTTP 요청 응답 시간 (히스토그램) + makeHistogramProvider({ + name: API_METRICS.HTTP_REQUEST_DURATION_SEC, + help: 'Duration of HTTP requests in seconds', + labelNames: ['method', 'path', 'status'], + buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10], + }), +]; + +export const outboxMetricsProviders = [ + makeCounterProvider({ + name: OUTBOX_METRICS.OUTBOX_EVENTS_PUBLISHED_TOTAL, + help: 'Total number of outbox events published to RabbitMQ', + labelNames: ['status', 'event_type'], + }), + + makeCounterProvider({ + name: OUTBOX_METRICS.STATUS_TRANSITIONS_TOTAL, + help: 'Total number of outbox status transitions', + labelNames: ['from_status', 'to_status', 'event_type'], + }), + + makeHistogramProvider({ + name: OUTBOX_METRICS.OUTBOX_PROCESSING_DURATION_SEC, + help: 'Duration from outbox event creation to publication', + labelNames: ['event_type'], + buckets: [0.1, 0.5, 1, 5, 10, 30, 60], + }), + + makeCounterProvider({ + name: OUTBOX_METRICS.OUTBOX_DEAD_LETTER_TOTAL, + help: 'Total number of events moved to dead letter queue', + labelNames: ['event_type'], + }), +]; + +export const rabbitMQMetricsProviders = [ + makeCounterProvider({ + name: RABBITMQ_METRICS.MESSAGES_PUBLISHED_TOTAL, + help: 'Total number of messages published', + labelNames: ['pattern', 'status'], + }), + + makeCounterProvider({ + name: RABBITMQ_METRICS.MESSAGES_CONSUMED_TOTAL, + help: 'Total number of messages consumed', + labelNames: ['pattern', 'status'], + }), + + makeHistogramProvider({ + name: RABBITMQ_METRICS.MESSAGES_PROCESSING_DURATION_SEC, + help: 'Duration of message processing', + labelNames: ['pattern', 'event_type'], + buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10], + }), + + makeGaugeProvider({ + name: RABBITMQ_METRICS.MESSAGES_IN_FLIGHT, + help: 'Number of messages currently in flight', + labelNames: ['pattern'], + }), +]; + +export const elasticSearchMetricsProviders = [ + makeCounterProvider({ + name: ELASTICSEARCH_METRICS.OPERATIONS_TOTAL, + help: 'Total number of Elasticsearch operations', + labelNames: ['operation', 'status'], + }), + + makeHistogramProvider({ + name: ELASTICSEARCH_METRICS.OPERATION_DURATION_SEC, + help: 'Duration of Elasticsearch operations', + labelNames: ['operation'], + buckets: [0.01, 0.1, 0.5, 1, 5], + }), + + makeCounterProvider({ + name: ELASTICSEARCH_METRICS.RECORD_SYNC_EVENTS_TOTAL, + help: 'Total number of record sync events processed', + labelNames: ['event_type'], + }), +]; + +export const allMetricsProviders = [ + ...metricsProviders, + ...outboxMetricsProviders, + ...rabbitMQMetricsProviders, + ...elasticSearchMetricsProviders, +]; diff --git a/apps/api/src/infra/monitoring/prometheus.module.ts b/apps/api/src/infra/monitoring/prometheus.module.ts new file mode 100644 index 00000000..76285331 --- /dev/null +++ b/apps/api/src/infra/monitoring/prometheus.module.ts @@ -0,0 +1,37 @@ +import { Module } from '@nestjs/common'; +import { PrometheusModule as NestPrometheusModule } from '@willsoto/nestjs-prometheus'; +import { allMetricsProviders } from './metrics.provider'; +import { ApiMetricsInterceptor } from '@/infra/monitoring/interceptor/api-metrics.interceptor'; +import { APP_INTERCEPTOR } from '@nestjs/core'; +import { ApiMetricsService } from './services/api-metrics.service'; +import { OutboxMetricsService } from './services/outbox-metrics.service'; +import { RabbitMQMetricsService } from './services/rabbitmq-metrics.service'; +import { ElasticsearchMetricsService } from './services/elasticsearch-metrics.service'; + +@Module({ + imports: [ + NestPrometheusModule.register({ + defaultMetrics: { enabled: true }, + path: '/metrics', + defaultLabels: { app: 'locus' }, + }), + ], + providers: [ + ...allMetricsProviders, + ApiMetricsService, + OutboxMetricsService, + RabbitMQMetricsService, + ElasticsearchMetricsService, + { + provide: APP_INTERCEPTOR, + useClass: ApiMetricsInterceptor, + }, + ], + exports: [ + NestPrometheusModule, + OutboxMetricsService, + RabbitMQMetricsService, + ElasticsearchMetricsService, + ], +}) +export class PrometheusModule {} diff --git a/apps/api/src/infra/monitoring/services/api-metrics.service.ts b/apps/api/src/infra/monitoring/services/api-metrics.service.ts new file mode 100644 index 00000000..dc9ab3f8 --- /dev/null +++ b/apps/api/src/infra/monitoring/services/api-metrics.service.ts @@ -0,0 +1,32 @@ +import { Injectable } from '@nestjs/common'; +import { InjectMetric } from '@willsoto/nestjs-prometheus'; +import { Counter, Histogram } from 'prom-client'; +import { API_METRICS } from '../constants/metrics.constants'; + +@Injectable() +export class ApiMetricsService { + constructor( + @InjectMetric(API_METRICS.HTTP_REQUESTS_TOTAL) + private readonly requestsTotal: Counter, + + @InjectMetric(API_METRICS.HTTP_REQUEST_DURATION_SEC) + private readonly requestDuration: Histogram, + ) {} + + // request 기록 + recordRequest(method: string, path: string, statusCode: number): void { + this.requestsTotal.inc({ method, path, status: statusCode.toString() }); + } + + recordAPIDurationTime( + method: string, + path: string, + statusCode: number, + durationSeconds: number, + ): void { + this.requestDuration.observe( + { method, path, status: statusCode.toString() }, + durationSeconds, + ); + } +} diff --git a/apps/api/src/infra/monitoring/services/elasticsearch-metrics.service.ts b/apps/api/src/infra/monitoring/services/elasticsearch-metrics.service.ts new file mode 100644 index 00000000..96941cff --- /dev/null +++ b/apps/api/src/infra/monitoring/services/elasticsearch-metrics.service.ts @@ -0,0 +1,39 @@ +import { Injectable } from '@nestjs/common'; +import { InjectMetric } from '@willsoto/nestjs-prometheus'; +import { Counter, Histogram } from 'prom-client'; +import { ELASTICSEARCH_METRICS } from '../constants/metrics.constants'; + +export type ESOperation = 'index' | 'update' | 'delete'; + +@Injectable() +export class ElasticsearchMetricsService { + constructor( + @InjectMetric(ELASTICSEARCH_METRICS.OPERATIONS_TOTAL) + private readonly operationsCounter: Counter, + + @InjectMetric(ELASTICSEARCH_METRICS.OPERATION_DURATION_SEC) + private readonly operationDuration: Histogram, + + @InjectMetric(ELASTICSEARCH_METRICS.RECORD_SYNC_EVENTS_TOTAL) + private readonly syncEventsCounter: Counter, + ) {} + + recordOperationSuccess(operation: ESOperation): void { + this.operationsCounter.inc({ operation, status: 'success' }); + } + + recordOperationFailure(operation: ESOperation): void { + this.operationsCounter.inc({ operation, status: 'failed' }); + } + + recordOperationDuration( + operation: ESOperation, + durationSeconds: number, + ): void { + this.operationDuration.observe({ operation }, durationSeconds); + } + + recordSyncEvent(eventType: string): void { + this.syncEventsCounter.inc({ event_type: eventType }); + } +} diff --git a/apps/api/src/infra/monitoring/services/outbox-metrics.service.ts b/apps/api/src/infra/monitoring/services/outbox-metrics.service.ts new file mode 100644 index 00000000..4c91c430 --- /dev/null +++ b/apps/api/src/infra/monitoring/services/outbox-metrics.service.ts @@ -0,0 +1,49 @@ +import { Injectable } from '@nestjs/common'; +import { InjectMetric } from '@willsoto/nestjs-prometheus'; +import { Counter, Histogram } from 'prom-client'; +import { OUTBOX_METRICS } from '../constants/metrics.constants'; + +@Injectable() +export class OutboxMetricsService { + constructor( + @InjectMetric(OUTBOX_METRICS.OUTBOX_EVENTS_PUBLISHED_TOTAL) + private readonly publishedCounter: Counter, + + @InjectMetric(OUTBOX_METRICS.STATUS_TRANSITIONS_TOTAL) + private readonly statusTransitionCounter: Counter, + + @InjectMetric(OUTBOX_METRICS.OUTBOX_PROCESSING_DURATION_SEC) + private readonly processingDuration: Histogram, + + @InjectMetric(OUTBOX_METRICS.OUTBOX_DEAD_LETTER_TOTAL) + private readonly deadLetterCounter: Counter, + ) {} + + recordPublishSuccess(eventType: string): void { + this.publishedCounter.inc({ status: 'success', event_type: eventType }); + } + + recordPublishFailure(eventType: string): void { + this.publishedCounter.inc({ status: 'failed', event_type: eventType }); + } + + recordProcessingDuration(eventType: string, durationSeconds: number): void { + this.processingDuration.observe({ event_type: eventType }, durationSeconds); + } + + recordDeadLetter(eventType: string): void { + this.deadLetterCounter.inc({ event_type: eventType }); + } + + recordStatusTransition( + fromStatus: string, + toStatus: string, + eventType: string, + ): void { + this.statusTransitionCounter.inc({ + from_status: fromStatus, + to_status: toStatus, + event_type: eventType, + }); + } +} diff --git a/apps/api/src/infra/monitoring/services/rabbitmq-metrics.service.ts b/apps/api/src/infra/monitoring/services/rabbitmq-metrics.service.ts new file mode 100644 index 00000000..9d5bef62 --- /dev/null +++ b/apps/api/src/infra/monitoring/services/rabbitmq-metrics.service.ts @@ -0,0 +1,56 @@ +import { Injectable } from '@nestjs/common'; +import { InjectMetric } from '@willsoto/nestjs-prometheus'; +import { Counter, Gauge, Histogram } from 'prom-client'; +import { RABBITMQ_METRICS } from '@/infra/monitoring/constants/metrics.constants'; + +@Injectable() +export class RabbitMQMetricsService { + constructor( + @InjectMetric(RABBITMQ_METRICS.MESSAGES_PUBLISHED_TOTAL) + private readonly publishedCounter: Counter, + + @InjectMetric(RABBITMQ_METRICS.MESSAGES_CONSUMED_TOTAL) + private readonly consumedCounter: Counter, + + @InjectMetric(RABBITMQ_METRICS.MESSAGES_PROCESSING_DURATION_SEC) + private readonly processingDuration: Histogram, + + @InjectMetric(RABBITMQ_METRICS.MESSAGES_IN_FLIGHT) + private readonly inFlightGauge: Gauge, + ) {} + + recordPublishSuccess(pattern: string): void { + this.publishedCounter.inc({ pattern, status: 'success' }); + } + + recordPublishFailure(pattern: string): void { + this.publishedCounter.inc({ pattern, status: 'failed' }); + } + + recordConsumeAck(pattern: string): void { + this.consumedCounter.inc({ pattern, status: 'ack' }); + } + + recordConsumeNack(pattern: string): void { + this.consumedCounter.inc({ pattern, status: 'nack' }); + } + + recordProcessingDuration( + pattern: string, + eventType: string, + durationSeconds: number, + ): void { + this.processingDuration.observe( + { pattern, event_type: eventType }, + durationSeconds, + ); + } + + incrementInFlight(pattern: string): void { + this.inFlightGauge.inc({ pattern }); + } + + decrementInFlight(pattern: string): void { + this.inFlightGauge.dec({ pattern }); + } +} diff --git a/apps/api/src/outbox/outbox.module.ts b/apps/api/src/outbox/outbox.module.ts index a413b8a6..2667edd8 100644 --- a/apps/api/src/outbox/outbox.module.ts +++ b/apps/api/src/outbox/outbox.module.ts @@ -2,9 +2,10 @@ import { Module } from '@nestjs/common'; import { OutboxPublisher } from './outbox.publisher'; import { RabbitMqModule } from '@/mq/rabbitmq.module'; import { OutboxService } from './outbox.service'; +import { PrometheusModule } from '@/infra/monitoring/prometheus.module'; @Module({ - imports: [RabbitMqModule], + imports: [RabbitMqModule, PrometheusModule], providers: [OutboxPublisher, OutboxService], exports: [OutboxService], }) diff --git a/apps/api/src/outbox/outbox.publisher.ts b/apps/api/src/outbox/outbox.publisher.ts index 0748d556..85024dc5 100644 --- a/apps/api/src/outbox/outbox.publisher.ts +++ b/apps/api/src/outbox/outbox.publisher.ts @@ -10,6 +10,8 @@ import { OutboxEvent, } from '@/common/constants/event-types.constants'; import { OutboxService } from './outbox.service'; +import { OutboxMetricsService } from '@/infra/monitoring/services/outbox-metrics.service'; +import { RabbitMQMetricsService } from '@/infra/monitoring/services/rabbitmq-metrics.service'; @Injectable() export class OutboxPublisher implements OnModuleDestroy { @@ -21,6 +23,8 @@ export class OutboxPublisher implements OnModuleDestroy { private readonly outboxService: OutboxService, @Inject(RABBITMQ_CONSTANTS.CLIENTS.RECORD_SYNC_PRODUCER) private readonly client: ClientProxy, + private readonly outboxMetricsService: OutboxMetricsService, + private readonly rabbitMQMetricsService: RabbitMQMetricsService, ) {} async onModuleInit() { @@ -65,7 +69,21 @@ export class OutboxPublisher implements OnModuleDestroy { const outboxEvent = this.convertToOutboxEvent(event); await this.sendToRabbitMQ(outboxEvent); await this.outboxService.updateStatus(event.id, OutboxStatus.DONE); + + this.outboxMetricsService.recordStatusTransition( + OutboxStatus.PENDING, + OutboxStatus.DONE, + event.eventType, + ); + + const processingTime = (Date.now() - event.createdAt.getTime()) / 1000; + this.outboxMetricsService.recordProcessingDuration( + event.eventType, + processingTime, + ); + this.outboxMetricsService.recordPublishSuccess(event.eventType); } catch (_error) { + this.outboxMetricsService.recordPublishFailure(event.eventType); await this.handlePublishFailure(event); } } @@ -82,28 +100,50 @@ export class OutboxPublisher implements OnModuleDestroy { } private async sendToRabbitMQ(event: OutboxEvent) { - await lastValueFrom( - this.client - .emit(RABBITMQ_CONSTANTS.PATTERNS.RECORD_SYNC, event) - .pipe(timeout(5000)), - ); - this.logger.log(`✅ ${event.eventId} Event가 RabbitMQ에 publish`); + try { + await lastValueFrom( + this.client + .emit(RABBITMQ_CONSTANTS.PATTERNS.RECORD_SYNC, event) + .pipe(timeout(5000)), + ); + + this.rabbitMQMetricsService.recordPublishSuccess( + RABBITMQ_CONSTANTS.PATTERNS.RECORD_SYNC, + ); + this.logger.log(`✅ ${event.eventId} Event가 RabbitMQ에 publish`); + } catch (error) { + this.rabbitMQMetricsService.recordPublishFailure( + RABBITMQ_CONSTANTS.PATTERNS.RECORD_SYNC, + ); + throw error; + } } private async handlePublishFailure(outbox: Outbox): Promise { const retryCount = outbox.retryCount + 1; const isDead = retryCount >= this.MAX_RETRY_COUNT; - await this.outboxService.updateStatus( - outbox.id, - isDead ? OutboxStatus.DEAD : OutboxStatus.RETRY, - ); - if (isDead) { + await this.outboxService.updateStatus(outbox.id, OutboxStatus.DEAD); + + this.outboxMetricsService.recordStatusTransition( + OutboxStatus.PENDING, + OutboxStatus.DEAD, + outbox.eventType, + ); + this.outboxMetricsService.recordDeadLetter(outbox.eventType); this.logger.error( `🚨 DLQ: Event ${outbox.id}가 최종 실패 처리되었습니다.`, ); } else { + await this.outboxService.updateStatus(outbox.id, OutboxStatus.RETRY); + + this.outboxMetricsService.recordStatusTransition( + OutboxStatus.PENDING, + OutboxStatus.RETRY, + outbox.eventType, + ); + this.logger.warn( `⚠️ Event ${outbox.id} 발행 실패 (재시도 ${retryCount} / 5)`, ); diff --git a/apps/api/src/outbox/outbox.service.ts b/apps/api/src/outbox/outbox.service.ts index d9999817..55749a8d 100644 --- a/apps/api/src/outbox/outbox.service.ts +++ b/apps/api/src/outbox/outbox.service.ts @@ -2,15 +2,19 @@ import { Injectable } from '@nestjs/common'; import { PrismaService } from '../prisma/prisma.service'; import { Outbox, OutboxStatus, Prisma } from '@prisma/client'; import { OutboxEvent } from '@/common/constants/event-types.constants'; +import { OutboxMetricsService } from '@/infra/monitoring/services/outbox-metrics.service'; @Injectable() export class OutboxService { private readonly MAX_RETRY_COUNT = 5; - constructor(private readonly prisma: PrismaService) {} + constructor( + private readonly prisma: PrismaService, + private readonly outboxMetricsService: OutboxMetricsService, + ) {} async publish(tx: Prisma.TransactionClient, data: OutboxEvent) { - return tx.outbox.create({ + const event = tx.outbox.create({ data: { aggregateType: data.aggregateType, aggregateId: BigInt(data.aggregateId), @@ -20,6 +24,12 @@ export class OutboxService { payload: data.payload as Prisma.InputJsonValue, }, }); + this.outboxMetricsService.recordStatusTransition( + '', + OutboxStatus.PENDING, + data.eventType, + ); + return event; } async getPendingOutboxEvents(): Promise { diff --git a/apps/api/src/records/consumer/record-sync.consumer.ts b/apps/api/src/records/consumer/record-sync.consumer.ts index 15e23e9f..71199b84 100644 --- a/apps/api/src/records/consumer/record-sync.consumer.ts +++ b/apps/api/src/records/consumer/record-sync.consumer.ts @@ -10,12 +10,21 @@ import { } from '../type/record-sync.types'; import { RecordSearchService } from '../services/records-search.service'; import { OUTBOX_EVENT_TYPE } from '@/common/constants/event-types.constants'; +import { RabbitMQMetricsService } from '@/infra/monitoring/services/rabbitmq-metrics.service'; +import { + ESOperation, + ElasticsearchMetricsService, +} from '@/infra/monitoring/services/elasticsearch-metrics.service'; @Controller() export class RecordSyncConsumer { private readonly logger = new Logger(RecordSyncConsumer.name); - constructor(private readonly recordSearchService: RecordSearchService) {} + constructor( + private readonly recordSearchService: RecordSearchService, + private readonly rabbitMQMetricsService: RabbitMQMetricsService, + private readonly esMetricsService: ElasticsearchMetricsService, + ) {} /** * RabbitMQ 메시지 수신 및 처리 @@ -35,44 +44,86 @@ export class RecordSyncConsumer { const channel = context.getChannelRef() as Channel; const message = context.getMessage() as Message; + const pattern = RABBITMQ_CONSTANTS.PATTERNS.RECORD_SYNC; + this.rabbitMQMetricsService.incrementInFlight(pattern); + const startTime = Date.now(); + + try { + await this.processEventByType(event); + // 성공 시 ACK → RabbitMQ에서 메시지 제거 + channel.ack(message); + const duration = (Date.now() - startTime) / 1000; + + this.rabbitMQMetricsService.recordConsumeAck(pattern); + this.rabbitMQMetricsService.recordProcessingDuration( + pattern, + event.eventType, + duration, + ); + this.esMetricsService.recordSyncEvent(event.eventType); + this.logger.log(`✅ Event ${event.eventId} 처리완료 및 ACK`); + } catch (error) { + this.logger.error(`❌ Event ${event.eventId} 처리 실패`, error); + + // 실패 시 NACK (requeue: true) + // → RabbitMQ가 메시지를 큐에 다시 넣음 + channel.nack(message, false, true); + this.rabbitMQMetricsService.recordConsumeNack(pattern); + this.logger.warn(`↩️ Event ${event.eventId} 재시도를 위해 큐에 넣음.`); + } finally { + this.rabbitMQMetricsService.decrementInFlight(pattern); + } + } + + private async processEventByType(event: RecordSyncEvent): Promise { + const startTime = Date.now(); + let operation: ESOperation; + try { - // 이벤트 타입에 따라 처리 switch (event.eventType) { case OUTBOX_EVENT_TYPE.RECORD_CREATED: + operation = 'index'; await this.recordSearchService.indexRecord( event.payload as RecordSyncPayload, ); break; + case OUTBOX_EVENT_TYPE.RECORD_UPDATED: + operation = 'update'; await this.recordSearchService.updateRecord( event.payload as RecordSyncPayload, ); break; + case OUTBOX_EVENT_TYPE.RECORD_FAVORITE_UPDATED: + operation = 'update'; await this.recordSearchService.updateFavoriteInRecord( event.payload as RecordFavoriteSyncPayload, ); break; + case OUTBOX_EVENT_TYPE.RECORD_CONNECTIONS_COUNT_UPDATED: + operation = 'update'; await this.recordSearchService.updateConnectionsCountInRecord( event.payload as RecordConnectionsCountSyncPayload, ); break; + case OUTBOX_EVENT_TYPE.RECORD_DELETED: + operation = 'delete'; await this.recordSearchService.deleteRecord(event.aggregateId); break; + + default: + throw new Error('Unknown event type received'); } - // 성공 시 ACK → RabbitMQ에서 메시지 제거 - channel.ack(message); - this.logger.log(`✅ Event ${event.eventId} 처리완료 및 ACK`); + const duration = (Date.now() - startTime) / 1000; + this.esMetricsService.recordOperationSuccess(operation); + this.esMetricsService.recordOperationDuration(operation, duration); } catch (error) { - this.logger.error(`❌ Event ${event.eventId} 처리 실패`, error); - - // 실패 시 NACK (requeue: true) - // → RabbitMQ가 메시지를 큐에 다시 넣음 - channel.nack(message, false, true); - this.logger.warn(`↩️ Event ${event.eventId} 재시도를 위해 큐에 넣음.`); + this.esMetricsService.recordOperationFailure(operation!); + throw error; } } } diff --git a/apps/api/src/records/records.module.ts b/apps/api/src/records/records.module.ts index 44ccb69c..1c0b7c9a 100644 --- a/apps/api/src/records/records.module.ts +++ b/apps/api/src/records/records.module.ts @@ -18,6 +18,7 @@ import { RecordGraphService } from './services/records-graph.service'; import { RecordLocationService } from './services/records-location.service'; import { RecordQueryService } from './services/records-query.service'; import { RecordImageService } from './services/records-image.service'; +import { PrometheusModule } from '@/infra/monitoring/prometheus.module'; @Module({ imports: [ @@ -27,6 +28,7 @@ import { RecordImageService } from './services/records-image.service'; OutboxModule, TagsModule, ImagesModule, + PrometheusModule, ], controllers: [RecordsController, RecordSyncConsumer], providers: [ diff --git a/apps/api/test/outbox/outbox.publisher.spec.ts b/apps/api/test/outbox/outbox.publisher.spec.ts index 20781f3f..89f5237f 100644 --- a/apps/api/test/outbox/outbox.publisher.spec.ts +++ b/apps/api/test/outbox/outbox.publisher.spec.ts @@ -5,6 +5,8 @@ import { OutboxService } from '../../src/outbox/outbox.service'; import { Outbox, OutboxStatus } from '@prisma/client'; import { RABBITMQ_CONSTANTS } from '@/common/constants/rabbitmq.constants'; import { of, throwError } from 'rxjs'; +import { OutboxMetricsService } from '@/infra/monitoring/services/outbox-metrics.service'; +import { RabbitMQMetricsService } from '@/infra/monitoring/services/rabbitmq-metrics.service'; describe('OutboxPublisher', () => { let publisher: OutboxPublisher; @@ -20,6 +22,19 @@ describe('OutboxPublisher', () => { emit: jest.fn(), }; + const mockOutboxMetricsService = { + recordPublishSuccess: jest.fn(), + recordPublishFailure: jest.fn(), + recordProcessingDuration: jest.fn(), + recordDeadLetter: jest.fn(), + recordStatusTransition: jest.fn(), + }; + + const mockRabbitMQMetricsService = { + recordPublishSuccess: jest.fn(), + recordPublishFailure: jest.fn(), + }; + beforeEach(async () => { jest.clearAllMocks(); @@ -34,6 +49,14 @@ describe('OutboxPublisher', () => { provide: RABBITMQ_CONSTANTS.CLIENTS.RECORD_SYNC_PRODUCER, useValue: mockClientProxy, }, + { + provide: OutboxMetricsService, + useValue: mockOutboxMetricsService, + }, + { + provide: RabbitMQMetricsService, + useValue: mockRabbitMQMetricsService, + }, ], }).compile(); diff --git a/apps/api/test/outbox/outbox.service.spec.ts b/apps/api/test/outbox/outbox.service.spec.ts index eee7d15d..6fd28258 100644 --- a/apps/api/test/outbox/outbox.service.spec.ts +++ b/apps/api/test/outbox/outbox.service.spec.ts @@ -6,6 +6,7 @@ import { AGGREGATE_TYPE, OUTBOX_EVENT_TYPE, } from '@/common/constants/event-types.constants'; +import { OutboxMetricsService } from '@/infra/monitoring/services/outbox-metrics.service'; describe('OutboxService', () => { let service: OutboxService; @@ -17,6 +18,14 @@ describe('OutboxService', () => { }, }; + const mockOutboxMetricsService = { + recordPublishSuccess: jest.fn(), + recordPublishFailure: jest.fn(), + recordProcessingDuration: jest.fn(), + recordDeadLetter: jest.fn(), + recordStatusTransition: jest.fn(), + }; + beforeEach(async () => { jest.clearAllMocks(); @@ -27,6 +36,10 @@ describe('OutboxService', () => { provide: PrismaService, useValue: mockPrismaService, }, + { + provide: OutboxMetricsService, + useValue: mockOutboxMetricsService, + }, ], }).compile(); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3c0f757f..10d56fec 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -94,6 +94,9 @@ importers: '@prisma/client': specifier: ^7.2.0 version: 7.3.0(prisma@7.3.0(@types/react@19.2.10)(magicast@0.5.1)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)(typescript@5.9.3))(typescript@5.9.3) + '@willsoto/nestjs-prometheus': + specifier: ^6.0.2 + version: 6.0.2(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(prom-client@15.1.3) amqp-connection-manager: specifier: ^5.0.0 version: 5.0.0(amqplib@0.10.9) @@ -142,6 +145,9 @@ importers: pg: specifier: ^8.16.3 version: 8.17.2 + prom-client: + specifier: ^15.1.3 + version: 15.1.3 redis: specifier: ^5.10.0 version: 5.10.0 @@ -6196,6 +6202,15 @@ packages: integrity: sha512-kPSSXE6De1XOR820C90RIo2ogvZG+c3KiHzqUoO/F34Y2shGzesfqv7o57xrxovZJH/MetF5UjroJ/R/3isoiw==, } + '@willsoto/nestjs-prometheus@6.0.2': + resolution: + { + integrity: sha512-ePyLZYdIrOOdlOWovzzMisIgviXqhPVzFpSMKNNhn6xajhRHeBsjAzSdpxZTc6pnjR9hw1lNAHyKnKl7lAPaVg==, + } + peerDependencies: + '@nestjs/common': ^7.0.0 || ^8.0.0 || ^9.0.0 || ^10.0.0 || ^11.0.0 + prom-client: ^15.0.0 + '@xtuc/ieee754@1.2.0': resolution: { @@ -6741,6 +6756,12 @@ packages: } engines: { node: '>=8' } + bintrees@1.0.2: + resolution: + { + integrity: sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==, + } + bl@4.1.0: resolution: { @@ -11990,6 +12011,13 @@ packages: typescript: optional: true + prom-client@15.1.3: + resolution: + { + integrity: sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==, + } + engines: { node: ^16 || ^18 || >=20 } + promise-breaker@6.0.0: resolution: { @@ -13224,6 +13252,12 @@ packages: } engines: { node: '>=6' } + tdigest@0.1.2: + resolution: + { + integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==, + } + teeny-request@9.0.0: resolution: { @@ -18989,6 +19023,11 @@ snapshots: '@webassemblyjs/ast': 1.14.1 '@xtuc/long': 4.2.2 + '@willsoto/nestjs-prometheus@6.0.2(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(prom-client@15.1.3)': + dependencies: + '@nestjs/common': 11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2) + prom-client: 15.1.3 + '@xtuc/ieee754@1.2.0': {} '@xtuc/long@4.2.2': {} @@ -19327,6 +19366,8 @@ snapshots: binary-extensions@2.3.0: optional: true + bintrees@1.0.2: {} + bl@4.1.0: dependencies: buffer: 5.7.1 @@ -22985,6 +23026,11 @@ snapshots: - react - react-dom + prom-client@15.1.3: + dependencies: + '@opentelemetry/api': 1.9.0 + tdigest: 0.1.2 + promise-breaker@6.0.0: {} promise@7.3.1: @@ -23870,6 +23916,10 @@ snapshots: tapable@2.3.0: {} + tdigest@0.1.2: + dependencies: + bintrees: 1.0.2 + teeny-request@9.0.0: dependencies: http-proxy-agent: 5.0.0