diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 00000000..e69de29b diff --git a/src/main/java/com/hf/healthfriend/domain/notification/consumer/DBWorker.java b/src/main/java/com/hf/healthfriend/domain/notification/consumer/AlarmWorker.java similarity index 55% rename from src/main/java/com/hf/healthfriend/domain/notification/consumer/DBWorker.java rename to src/main/java/com/hf/healthfriend/domain/notification/consumer/AlarmWorker.java index 90fea58f..7df34b9c 100644 --- a/src/main/java/com/hf/healthfriend/domain/notification/consumer/DBWorker.java +++ b/src/main/java/com/hf/healthfriend/domain/notification/consumer/AlarmWorker.java @@ -1,42 +1,58 @@ package com.hf.healthfriend.domain.notification.consumer; +import com.hf.healthfriend.domain.notification.dto.Alarm; import com.hf.healthfriend.domain.notification.dto.NotificationEvent; import com.hf.healthfriend.domain.notification.entity.Notification; import com.hf.healthfriend.domain.notification.repository.NotificationRepository; +import com.hf.healthfriend.domain.notification.service.NotificationSSEService; import com.hf.healthfriend.domain.notification.util.JsonUtils; import com.hf.healthfriend.domain.notification.util.NotificationMessageGenerator; import io.awspring.cloud.sqs.annotation.SqsListener; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; @Component @RequiredArgsConstructor @Slf4j -public class DBWorker { +public class AlarmWorker { + + private final NotificationSSEService notificationSseService; private final NotificationRepository notificationRepository; private final NotificationMessageGenerator messageGenerator; private final JsonUtils jsonUtils; - @SqsListener("${aws.sqs.dbQueueUrl}") - public void consumeDbQueue(String message) { + @SqsListener("${aws.sqs.alarmQueueUrl}") + @Transactional + public void consumeAlarmMessage(String message) { try { NotificationEvent event = jsonUtils.deserializeMessage(message); - String noMessage = messageGenerator.generateMessage(event); + String alarmMessage = messageGenerator.generateMessage(event); + Long memberId = event.memberId(); + + // 1. SSE 알림 전송 + Alarm alarm = Alarm.builder() + .event(event) + .alarmMessage(alarmMessage) + .build(); + notificationSseService.send(memberId, alarm); + log.info("SSE 알림 전송 완료: {}", event); + // 2. DB 저장 Notification notification = Notification.builder() .memberId(event.memberId()) .type(event.type()) .targetId(event.targetId()) - .message(noMessage) + .message(alarmMessage) .isRead(false) .build(); - notificationRepository.save(notification); - log.info("DB 저장 성공: {}", event); + log.info("DB 저장 완료: {}", event); + } catch (Exception e) { - log.error("DB 저장 실패: 메시지={}, 에러={}", message, e.getMessage()); + log.error("알림 처리 실패: 메시지={}, 에러={}", message, e.getMessage(), e); throw e; // SQS 자동 재시도를 위해 예외 던지기 } } -} +} \ No newline at end of file diff --git a/src/main/java/com/hf/healthfriend/domain/notification/consumer/SseWorker.java b/src/main/java/com/hf/healthfriend/domain/notification/consumer/SseWorker.java deleted file mode 100644 index a79a05f7..00000000 --- a/src/main/java/com/hf/healthfriend/domain/notification/consumer/SseWorker.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.hf.healthfriend.domain.notification.consumer; - -import com.hf.healthfriend.domain.notification.dto.Alarm; -import com.hf.healthfriend.domain.notification.dto.NotificationEvent; -import com.hf.healthfriend.domain.notification.service.NotificationSSEService; -import com.hf.healthfriend.domain.notification.util.JsonUtils; -import com.hf.healthfriend.domain.notification.util.NotificationMessageGenerator; -import io.awspring.cloud.sqs.annotation.SqsListener; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -@Slf4j -public class SseWorker { - - private final NotificationSSEService notificationSseService; - private final NotificationMessageGenerator messageGenerator; - private final JsonUtils jsonUtils; - - @SqsListener("${aws.sqs.alarmQueueUrl}") - public void consumeAlarmMessage(String message) { - try { - NotificationEvent event = jsonUtils.deserializeMessage(message); - String alarmMessage = messageGenerator.generateMessage(event); - Long memberId = event.memberId(); - Alarm alarm = Alarm.builder() - .event(event) - .alarmMessage(alarmMessage) - .build(); - notificationSseService.send(memberId,alarm); - log.info("클라이언트에 실시간 알림 전송 완료: {}", event); - } catch (Exception e) { - log.error("알림 전송 중 오류 발생: {}", e.getMessage(), e); - throw e; - } - } -} \ No newline at end of file diff --git a/src/main/java/com/hf/healthfriend/domain/notification/publisher/NotificationPublisher.java b/src/main/java/com/hf/healthfriend/domain/notification/publisher/NotificationPublisher.java index a100d1a4..c24e931d 100644 --- a/src/main/java/com/hf/healthfriend/domain/notification/publisher/NotificationPublisher.java +++ b/src/main/java/com/hf/healthfriend/domain/notification/publisher/NotificationPublisher.java @@ -7,29 +7,38 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; -import software.amazon.awssdk.services.sns.SnsClient; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; @Component @RequiredArgsConstructor @Slf4j public class NotificationPublisher { - private final SnsClient snsClient; + + private final SqsAsyncClient sqsAsyncClient; private final JsonUtils jsonUtils; - @Value("${aws.sns.topicArn}") - private String topicArn; + @Value("${aws.sqs.alarmQueueUrl}") + private String alarmQueueUrl; - @Transactional(propagation = Propagation.REQUIRES_NEW) public void publishNotification(Long memberId, NotificationType type, String actor, Long targetId) { NotificationEvent event = new NotificationEvent(memberId, type, actor, targetId); String message = jsonUtils.serialize(event); - snsClient.publish(builder -> builder - .topicArn(topicArn) - .message(message) - ); - log.info("SNS에 알림 이벤트 퍼블리싱: memberId={}, type={}, actor={}, targetId={}", - memberId, type, actor, targetId); + + // 트랜잭션이 커밋된 이후에 실행되도록 등록 + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + sqsAsyncClient.sendMessage(builder -> builder + .queueUrl(alarmQueueUrl) + .messageBody(message) + ); + + log.info("SQS에 알림 이벤트 퍼블리싱 (트랜잭션 커밋 후): memberId={}, type={}, actor={}, targetId={}", + memberId, type, actor, targetId); + } + }); } -} \ No newline at end of file +} + diff --git a/src/main/java/com/hf/healthfriend/domain/notification/util/JsonUtils.java b/src/main/java/com/hf/healthfriend/domain/notification/util/JsonUtils.java index a0b4a6ef..057f100b 100644 --- a/src/main/java/com/hf/healthfriend/domain/notification/util/JsonUtils.java +++ b/src/main/java/com/hf/healthfriend/domain/notification/util/JsonUtils.java @@ -3,9 +3,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.hf.healthfriend.domain.notification.dto.NotificationEvent; -import org.json.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +@Slf4j @Component public class JsonUtils { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -20,12 +21,10 @@ public String serialize(Object object) { public NotificationEvent deserializeMessage(String message) { try { - JSONObject snsMessage = new JSONObject(message); - String messageContent = snsMessage.getString("Message"); - ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(messageContent, NotificationEvent.class); + log.info("수신된 메시지: {}", message); + return objectMapper.readValue(message, NotificationEvent.class); } catch (JsonProcessingException e) { - throw new RuntimeException("메시지 역직렬화 실패", e); + throw new RuntimeException("메시지 역직렬화 실패: " + e.getMessage(), e); } } } \ No newline at end of file diff --git a/src/main/java/com/hf/healthfriend/global/config/AWSConfig.java b/src/main/java/com/hf/healthfriend/global/config/AWSConfig.java index 4c4f4782..69129ad3 100644 --- a/src/main/java/com/hf/healthfriend/global/config/AWSConfig.java +++ b/src/main/java/com/hf/healthfriend/global/config/AWSConfig.java @@ -1,5 +1,6 @@ package com.hf.healthfriend.global.config; +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; import jakarta.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -10,10 +11,9 @@ import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.presigner.S3Presigner; -import software.amazon.awssdk.services.sns.SnsClient; -import software.amazon.awssdk.services.sqs.SqsClient; @Configuration public class AWSConfig { @@ -30,18 +30,23 @@ public void checkEnvironmentVariables() { } @Bean - public SnsClient snsClient() { - return SnsClient.builder() + public SqsAsyncClient sqsClient() { + return SqsAsyncClient.builder() .region(Region.of(region)) .credentialsProvider(DefaultCredentialsProvider.create()) .build(); } @Bean - public SqsClient sqsClient() { - return SqsClient.builder() - .region(Region.of(region)) - .credentialsProvider(DefaultCredentialsProvider.create()) + public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { + return SqsMessageListenerContainerFactory + .builder() + .configure(sqsContainerOptionsBuilder -> + sqsContainerOptionsBuilder + .maxConcurrentMessages(10) // 컨테이너의 스레드 풀 크기 + .maxMessagesPerPoll(10) // 한 번의 폴링 요청으로 수신할 수 있는 최대 메시지 수를 지정 + ) + .sqsAsyncClient(sqsClient()) .build(); } diff --git a/src/test/java/com/hf/healthfriend/domain/notification/consumer/SseWorkerTest.java b/src/test/java/com/hf/healthfriend/domain/notification/consumer/AlarmWorkerTest.java similarity index 93% rename from src/test/java/com/hf/healthfriend/domain/notification/consumer/SseWorkerTest.java rename to src/test/java/com/hf/healthfriend/domain/notification/consumer/AlarmWorkerTest.java index 47a4f385..07220d0d 100644 --- a/src/test/java/com/hf/healthfriend/domain/notification/consumer/SseWorkerTest.java +++ b/src/test/java/com/hf/healthfriend/domain/notification/consumer/AlarmWorkerTest.java @@ -14,7 +14,7 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class SseWorkerTest { +class AlarmWorkerTest { @Mock private NotificationSSEService notificationSseService; @@ -26,7 +26,7 @@ class SseWorkerTest { private JsonUtils jsonUtils; @InjectMocks - private SseWorker sseWorker; + private AlarmWorker alarmWorker; @Test void testConsumeAlarmMessage_Success() { @@ -42,7 +42,7 @@ void testConsumeAlarmMessage_Success() { Mockito.when(messageGenerator.generateMessage(event)).thenReturn("Sse Worker Test"); // When - sseWorker.consumeAlarmMessage(message); + alarmWorker.consumeAlarmMessage(message); // Then Mockito.verify(notificationSseService, Mockito.times(1)).send(1L, alarm); diff --git a/src/test/java/com/hf/healthfriend/domain/notification/consumer/DBWorkerTest.java b/src/test/java/com/hf/healthfriend/domain/notification/consumer/DBWorkerTest.java index 37c340e0..84d4b29b 100644 --- a/src/test/java/com/hf/healthfriend/domain/notification/consumer/DBWorkerTest.java +++ b/src/test/java/com/hf/healthfriend/domain/notification/consumer/DBWorkerTest.java @@ -1,7 +1,6 @@ package com.hf.healthfriend.domain.notification.consumer; import com.hf.healthfriend.domain.notification.constant.NotificationType; -import com.hf.healthfriend.domain.notification.consumer.DBWorker; import com.hf.healthfriend.domain.notification.dto.NotificationEvent; import com.hf.healthfriend.domain.notification.entity.Notification; import com.hf.healthfriend.domain.notification.repository.NotificationRepository;