Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .github/workflows/deploy.yml
Empty file.
Original file line number Diff line number Diff line change
@@ -1,42 +1,58 @@
package com.hf.healthfriend.domain.notification.consumer;

import com.hf.healthfriend.domain.notification.dto.Alarm;
import com.hf.healthfriend.domain.notification.dto.NotificationEvent;
import com.hf.healthfriend.domain.notification.entity.Notification;
import com.hf.healthfriend.domain.notification.repository.NotificationRepository;
import com.hf.healthfriend.domain.notification.service.NotificationSSEService;
import com.hf.healthfriend.domain.notification.util.JsonUtils;
import com.hf.healthfriend.domain.notification.util.NotificationMessageGenerator;
import io.awspring.cloud.sqs.annotation.SqsListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@RequiredArgsConstructor
@Slf4j
public class DBWorker {
public class AlarmWorker {

private final NotificationSSEService notificationSseService;
private final NotificationRepository notificationRepository;
private final NotificationMessageGenerator messageGenerator;
private final JsonUtils jsonUtils;

@SqsListener("${aws.sqs.dbQueueUrl}")
public void consumeDbQueue(String message) {
@SqsListener("${aws.sqs.alarmQueueUrl}")
@Transactional
public void consumeAlarmMessage(String message) {
try {
NotificationEvent event = jsonUtils.deserializeMessage(message);
String noMessage = messageGenerator.generateMessage(event);
String alarmMessage = messageGenerator.generateMessage(event);
Long memberId = event.memberId();

// 1. SSE 알림 전송
Alarm alarm = Alarm.builder()
.event(event)
.alarmMessage(alarmMessage)
.build();
notificationSseService.send(memberId, alarm);
log.info("SSE 알림 전송 완료: {}", event);

// 2. DB 저장
Notification notification = Notification.builder()
.memberId(event.memberId())
.type(event.type())
.targetId(event.targetId())
.message(noMessage)
.message(alarmMessage)
.isRead(false)
.build();

notificationRepository.save(notification);
log.info("DB 저장 성공: {}", event);
log.info("DB 저장 완료: {}", event);

} catch (Exception e) {
log.error("DB 저장 실패: 메시지={}, 에러={}", message, e.getMessage());
log.error("알림 처리 실패: 메시지={}, 에러={}", message, e.getMessage(), e);
throw e; // SQS 자동 재시도를 위해 예외 던지기
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,38 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import software.amazon.awssdk.services.sns.SnsClient;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationPublisher {
private final SnsClient snsClient;

private final SqsAsyncClient sqsAsyncClient;
private final JsonUtils jsonUtils;

@Value("${aws.sns.topicArn}")
private String topicArn;
@Value("${aws.sqs.alarmQueueUrl}")
private String alarmQueueUrl;

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void publishNotification(Long memberId, NotificationType type, String actor, Long targetId) {
NotificationEvent event = new NotificationEvent(memberId, type, actor, targetId);
String message = jsonUtils.serialize(event);
snsClient.publish(builder -> builder
.topicArn(topicArn)
.message(message)
);
log.info("SNS에 알림 이벤트 퍼블리싱: memberId={}, type={}, actor={}, targetId={}",
memberId, type, actor, targetId);

// 트랜잭션이 커밋된 이후에 실행되도록 등록
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
sqsAsyncClient.sendMessage(builder -> builder
.queueUrl(alarmQueueUrl)
.messageBody(message)
);

log.info("SQS에 알림 이벤트 퍼블리싱 (트랜잭션 커밋 후): memberId={}, type={}, actor={}, targetId={}",
memberId, type, actor, targetId);
}
});
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hf.healthfriend.domain.notification.dto.NotificationEvent;
import org.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class JsonUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -20,12 +21,10 @@ public String serialize(Object object) {

public NotificationEvent deserializeMessage(String message) {
try {
JSONObject snsMessage = new JSONObject(message);
String messageContent = snsMessage.getString("Message");
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(messageContent, NotificationEvent.class);
log.info("수신된 메시지: {}", message);
return objectMapper.readValue(message, NotificationEvent.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("메시지 역직렬화 실패", e);
throw new RuntimeException("메시지 역직렬화 실패: " + e.getMessage(), e);
}
}
}
21 changes: 13 additions & 8 deletions src/main/java/com/hf/healthfriend/global/config/AWSConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.hf.healthfriend.global.config;

import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -10,10 +11,9 @@
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;

import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sqs.SqsClient;

@Configuration
public class AWSConfig {
Expand All @@ -30,18 +30,23 @@ public void checkEnvironmentVariables() {
}

@Bean
public SnsClient snsClient() {
return SnsClient.builder()
public SqsAsyncClient sqsClient() {
return SqsAsyncClient.builder()
.region(Region.of(region))
.credentialsProvider(DefaultCredentialsProvider.create())
.build();
}

@Bean
public SqsClient sqsClient() {
return SqsClient.builder()
.region(Region.of(region))
.credentialsProvider(DefaultCredentialsProvider.create())
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
return SqsMessageListenerContainerFactory
.builder()
.configure(sqsContainerOptionsBuilder ->
sqsContainerOptionsBuilder
.maxConcurrentMessages(10) // 컨테이너의 스레드 풀 크기
.maxMessagesPerPoll(10) // 한 번의 폴링 요청으로 수신할 수 있는 최대 메시지 수를 지정
)
.sqsAsyncClient(sqsClient())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class SseWorkerTest {
class AlarmWorkerTest {

@Mock
private NotificationSSEService notificationSseService;
Expand All @@ -26,7 +26,7 @@ class SseWorkerTest {
private JsonUtils jsonUtils;

@InjectMocks
private SseWorker sseWorker;
private AlarmWorker alarmWorker;

@Test
void testConsumeAlarmMessage_Success() {
Expand All @@ -42,7 +42,7 @@ void testConsumeAlarmMessage_Success() {
Mockito.when(messageGenerator.generateMessage(event)).thenReturn("Sse Worker Test");

// When
sseWorker.consumeAlarmMessage(message);
alarmWorker.consumeAlarmMessage(message);

// Then
Mockito.verify(notificationSseService, Mockito.times(1)).send(1L, alarm);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.hf.healthfriend.domain.notification.consumer;

import com.hf.healthfriend.domain.notification.constant.NotificationType;
import com.hf.healthfriend.domain.notification.consumer.DBWorker;
import com.hf.healthfriend.domain.notification.dto.NotificationEvent;
import com.hf.healthfriend.domain.notification.entity.Notification;
import com.hf.healthfriend.domain.notification.repository.NotificationRepository;
Expand Down