Skip to content

Commit 178370a

Browse files
authored
Merge pull request #236 from minseo003/develop-notification
refactor: 레빗엠큐 멀티쓰레딩으로 전환
2 parents 9655e2e + ec3cbfe commit 178370a

10 files changed

Lines changed: 144 additions & 126 deletions

File tree

src/main/java/com/project/Teaming/domain/mentoring/service/MentoringBoardService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public void updateMentoringPost(Long postId, BoardRequest dto) {
196196
() -> new BusinessException(ErrorCode.NO_AUTHORITY));
197197

198198
mentoringBoard.updateBoard(dto);
199+
mentoringBoardRepository.flush();
199200

200201
} catch (OptimisticLockException e) {
201202
throw new BusinessException(ErrorCode.CONFLICT);

src/main/java/com/project/Teaming/domain/mentoring/service/RedisApplicantManagementService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private String generateApplicantKey(Long teamId) {
3434
public void saveApplicantWithTTL(Long teamId, TeamParticipationResponse response, LocalDate deadLine) {
3535
// 현재 시간과 deadLine 간의 차이를 계산
3636
LocalDateTime now = LocalDateTime.now();
37-
LocalDateTime deadlineDateTime = deadLine.atStartOfDay();
37+
LocalDateTime deadlineDateTime = deadLine.atTime(23, 59, 59);
3838
Duration duration = Duration.between(now, deadlineDateTime);
3939

4040
long ttlSeconds = duration.getSeconds();

src/main/java/com/project/Teaming/global/config/AsyncConfig.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ public class AsyncConfig {
1515
@Bean(name = "notificationExecutor")
1616
public Executor notificationExecutor() {
1717
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
18-
executor.setCorePoolSize(50); // 최소 스레드 개수
19-
executor.setMaxPoolSize(300); // 최대 스레드 개수
20-
executor.setQueueCapacity(3000); // 대기 큐 크기
21-
executor.setKeepAliveSeconds(30); // Idle 상태의 스레드를 유지하는 시간
18+
executor.setCorePoolSize(100); // 최소 스레드 개수
19+
executor.setMaxPoolSize(150); // 최대 스레드 개수
20+
executor.setQueueCapacity(200); // 대기 큐 크기
2221
executor.setThreadNamePrefix("Notification-");
2322
executor.initialize();
2423
return executor;

src/main/java/com/project/Teaming/global/config/RedisConfig.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,6 @@ public RedisTemplate<String, Object> participationRedisTemplate() {
8585
return template;
8686
}
8787

88-
// sse 연결한 사용자 공유용
89-
@Bean
90-
public StringRedisTemplate stringRedisTemplate() {
91-
StringRedisTemplate template = new StringRedisTemplate();
92-
template.setConnectionFactory(redisConnectionFactory(3));
93-
return template;
94-
}
95-
9688
@Bean
9789
public ObjectMapper redisObjectMapper() {
9890
ObjectMapper objectMapper = new ObjectMapper();
Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,65 @@
11
package com.project.Teaming.global.messageQueue.config;
22

3-
import org.springframework.amqp.core.Binding;
4-
import org.springframework.amqp.core.BindingBuilder;
5-
import org.springframework.amqp.core.DirectExchange;
6-
import org.springframework.amqp.core.Queue;
7-
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import org.springframework.amqp.core.*;
5+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
6+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
87
import org.springframework.amqp.rabbit.core.RabbitTemplate;
98
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
9+
import org.springframework.beans.factory.annotation.Value;
10+
import org.springframework.boot.ApplicationRunner;
1011
import org.springframework.context.annotation.Bean;
1112
import org.springframework.context.annotation.Configuration;
1213

14+
1315
@Configuration
1416
public class RabbitMQConfig {
1517

16-
private final String serverId = System.getenv("SERVER_ID"); // 현재 서버 ID 가져오기
17-
18+
@Value("${server.id}") // application.yml에서 설정된 SERVER_ID 값을 주입
19+
private String serverId;
1820
@Bean
1921
public Queue queue() {
20-
return new Queue(serverId, true); // 서버별 큐 생성
22+
return new Queue(serverId, true);
2123
}
22-
2324
@Bean
24-
public DirectExchange exchange() {
25-
return new DirectExchange("notification.exchange");
25+
public Binding binding(FanoutExchange exchange, Queue queue) {
26+
return BindingBuilder.bind(queue).to(exchange);
2627
}
27-
2828
@Bean
29-
public Binding binding(Queue queue, DirectExchange exchange) {
30-
return BindingBuilder.bind(queue).to(exchange).with(serverId); // 서버별 Routing Key 바인딩
29+
public FanoutExchange exchange() {
30+
return new FanoutExchange("notification.exchange");
3131
}
3232
@Bean
3333
public Jackson2JsonMessageConverter jsonMessageConverter() {
34-
return new Jackson2JsonMessageConverter(); // JSON 변환기 추가
34+
ObjectMapper objectMapper = new ObjectMapper();
35+
return new Jackson2JsonMessageConverter(objectMapper);
36+
}
37+
38+
@Bean
39+
public RabbitAdmin rabbitAdmin(CachingConnectionFactory cachingConnectionFactory) {
40+
return new RabbitAdmin(cachingConnectionFactory);
3541
}
3642

3743
@Bean
38-
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
39-
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
40-
rabbitTemplate.setMessageConverter(jsonMessageConverter());
44+
public ApplicationRunner runner(RabbitAdmin rabbitAdmin) {
45+
return args -> rabbitAdmin.initialize();
46+
}
47+
@Bean
48+
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory) {
49+
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
50+
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // ✅ JSON 메시지 변환기 적용
4151
return rabbitTemplate;
4252
}
53+
54+
@Bean
55+
public CachingConnectionFactory cachingConnectionFactory() {
56+
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
57+
cachingConnectionFactory.setUsername("admin");
58+
cachingConnectionFactory.setPassword("password");
59+
cachingConnectionFactory.setChannelCacheSize(50);
60+
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
61+
return cachingConnectionFactory;
62+
}
4363
}
4464

4565

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.project.Teaming.global.messageQueue.config;
2+
3+
4+
import org.springframework.amqp.core.AcknowledgeMode;
5+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
6+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
7+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
8+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
12+
@Configuration
13+
@EnableRabbit
14+
public class RabbitMQConsumerConfig {
15+
16+
@Bean
17+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory,
18+
Jackson2JsonMessageConverter jsonMessageConverter) {
19+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
20+
factory.setConnectionFactory(cachingConnectionFactory);
21+
factory.setConcurrentConsumers(30); // 동시에 메시지 처리 할 개수
22+
factory.setMaxConcurrentConsumers(50); // 최대 확장 가능 개수
23+
factory.setPrefetchCount(10); // 컨슈머가 한 번에 가져올 메시지 개수 설정
24+
factory.setMessageConverter(jsonMessageConverter);
25+
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 수동 ACK 모드 설정
26+
return factory;
27+
}
28+
}

src/main/java/com/project/Teaming/global/messageQueue/consumer/RabbitMQNotificationConsumer.java

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,94 @@
11
package com.project.Teaming.global.messageQueue.consumer;
22

3-
import com.fasterxml.jackson.databind.JsonNode;
4-
import com.fasterxml.jackson.databind.ObjectMapper;
3+
import com.project.Teaming.global.event.NotificationEvent;
54
import com.project.Teaming.global.sse.dto.EventPayload;
65
import com.project.Teaming.global.sse.dto.EventWithTeamPayload;
76
import com.project.Teaming.global.sse.entity.Notification;
7+
import com.project.Teaming.global.sse.repository.EmitterRepository;
8+
import com.project.Teaming.global.sse.repository.NotificationRepository;
89
import com.project.Teaming.global.sse.service.SseEmitterService;
10+
import com.rabbitmq.client.Channel;
911
import lombok.RequiredArgsConstructor;
1012
import lombok.extern.slf4j.Slf4j;
1113
import org.springframework.amqp.rabbit.annotation.RabbitListener;
14+
import org.springframework.amqp.support.AmqpHeaders;
15+
import org.springframework.beans.factory.annotation.Value;
16+
import org.springframework.messaging.handler.annotation.Header;
1217
import org.springframework.stereotype.Component;
18+
import org.springframework.transaction.annotation.Transactional;
19+
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
1324

1425
@Component
1526
@RequiredArgsConstructor
1627
@Slf4j
1728
public class RabbitMQNotificationConsumer {
1829

1930
private final SseEmitterService sseEmitterService;
20-
private final ObjectMapper objectMapper;
31+
private final EmitterRepository emitterRepository;
32+
private final NotificationRepository notificationRepository;
33+
34+
@Value("${server.id}") // application.yml에서 설정된 SERVER_ID 값을 주입
35+
private String serverId;
36+
37+
@Transactional(readOnly = true)
38+
@RabbitListener(queues = {"${server.id}"},
39+
containerFactory = "rabbitListenerContainerFactory")
40+
public void receiveNotification(NotificationEvent event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
2141

22-
@RabbitListener(queues = "#{@environment.getProperty('server.id')}", concurrency = "3")
23-
public void receiveNotification(String messageBody) {
2442
try {
25-
Object payload = determinePayloadType(messageBody);
26-
if (payload == null) return;
43+
List<Notification> notifications = notificationRepository.findAllById(event.getNotificationIds());
44+
notifications.forEach(notification -> {
45+
Long userId = notification.getUser().getId();
46+
SseEmitter emitter = emitterRepository.findById(userId);
2747

28-
log.info("✅ RabbitMQ 알림 이벤트 수신: {}", payload);
29-
sendNotification(payload);
48+
if (emitter != null) {
49+
// ✅ SSE 연결된 서버라면 알림 전송
50+
sendNotification(userId, notification);
51+
log.info("🚀 SSE 알림 전송 완료 → User: {}", userId);
52+
} else {
53+
log.warn("⚠️ SSE 연결 없음 ");
54+
}
55+
});
56+
channel.basicAck(tag, false);
3057
} catch (Exception e) {
58+
try {
59+
channel.basicAck(tag, false); // ❌ 예외 발생해도 재시도 안 하므로 ACK
60+
} catch (IOException ioException) {
61+
log.error("❌ RabbitMQ basicAck 실패: {}", ioException.getMessage(), ioException);
62+
}
3163
log.error("❌ RabbitMQ 알림 이벤트 처리 실패: {}", e.getMessage(), e);
3264
}
3365
}
3466

35-
private Object determinePayloadType(String messageBody) throws Exception {
36-
JsonNode jsonNode = objectMapper.readTree(messageBody);
37-
return jsonNode.has("teamId")
38-
? objectMapper.readValue(messageBody, EventWithTeamPayload.class)
39-
: objectMapper.readValue(messageBody, EventPayload.class);
40-
}
4167

42-
private void sendNotification(Object payload) {
43-
try {
44-
if (payload instanceof EventWithTeamPayload eventWithTeamPayload) {
45-
sseEmitterService.sendWithTeamId(eventWithTeamPayload.getUserId(), eventWithTeamPayload);
46-
} else if (payload instanceof EventPayload eventPayload) {
47-
sseEmitterService.send(eventPayload.getUserId(), eventPayload);
68+
private void sendNotification(Long userId, Notification notification){
69+
try {
70+
Object payload = (notification.getTeamId() != null) ?
71+
EventWithTeamPayload.builder()
72+
.userId(userId)
73+
.type(notification.getType())
74+
.category(notification.getCategory())
75+
.teamId(notification.getTeamId())
76+
.createdAt(notification.getCreatedAt().toString())
77+
.message(notification.getMessage())
78+
.isRead(notification.isRead())
79+
.build() :
80+
EventPayload.builder()
81+
.userId(userId)
82+
.type(notification.getType())
83+
.category(notification.getCategory())
84+
.createdAt(notification.getCreatedAt().toString())
85+
.message(notification.getMessage())
86+
.isRead(notification.isRead())
87+
.build();
88+
89+
sseEmitterService.sendToClient(userId, payload);
90+
} catch (Exception e) {
91+
log.error("❌ SSE 알림 전송 실패: {}", e.getMessage(), e);
4892
}
49-
} catch (Exception e) {
50-
log.error("❌ SSE 알림 전송 실패 : {}", e.getMessage(), e);
5193
}
5294
}
53-
}
Lines changed: 6 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,12 @@
11
package com.project.Teaming.global.messageQueue.publisher;
22

3-
import com.fasterxml.jackson.databind.ObjectMapper;
43
import com.project.Teaming.global.event.NotificationEvent;
5-
import com.project.Teaming.global.sse.dto.EventPayload;
6-
import com.project.Teaming.global.sse.dto.EventWithTeamPayload;
7-
import com.project.Teaming.global.sse.entity.Notification;
8-
import com.project.Teaming.global.sse.repository.NotificationRepository;
94
import lombok.RequiredArgsConstructor;
105
import lombok.extern.slf4j.Slf4j;
116
import org.springframework.amqp.rabbit.core.RabbitTemplate;
12-
import org.springframework.data.redis.core.StringRedisTemplate;
137
import org.springframework.stereotype.Service;
148
import org.springframework.transaction.annotation.Transactional;
159

16-
import java.util.List;
1710

1811
@Service
1912
@RequiredArgsConstructor
@@ -22,60 +15,16 @@ public class RabbitMQNotificationPublisher {
2215

2316
private final RabbitTemplate rabbitTemplate;
2417
private static final String EXCHANGE_NAME = "notification.exchange";
25-
private final StringRedisTemplate stringRedisTemplate;
26-
private final NotificationRepository notificationRepository;
27-
private final ObjectMapper objectMapper;
2818

2919
@Transactional(readOnly = true)
3020
public void sendNotificationEvent(NotificationEvent event) {
3121
log.info("📢 RabbitMQ 알림 이벤트 발행: {}", event.getNotificationIds());
3222

33-
List<Notification> notifications = notificationRepository.findAllById(event.getNotificationIds());
34-
if (notifications.isEmpty()) {
35-
log.warn("⚠️ 해당 알림 ID들에 대한 알림 없음 - 전송 중단");
36-
return;
23+
try {
24+
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", event);
25+
log.info("🚀 RabbitMQ 메시지 발행 완료");
26+
} catch (Exception e) {
27+
log.error("❌ RabbitMQ 메시지 발행 실패: {}", e.getMessage(), e);
3728
}
38-
39-
notifications.forEach(notification -> {
40-
Long userId = notification.getUser().getId();
41-
String targetServerId = stringRedisTemplate.opsForValue().get("sse_server:" + userId);
42-
43-
// ✅ Redis에 서버 정보가 없으면 "오프라인 사용자 큐"에 저장 가능
44-
if (targetServerId == null) {
45-
log.warn("⚠️ Redis에 사용자({})의 SSE 연결 정보 없음 - 알림 전송 중단", userId);
46-
return;
47-
}
48-
49-
try {
50-
Object payload;
51-
if (notification.getTeamId() != null) {
52-
payload = EventWithTeamPayload.builder()
53-
.userId(userId)
54-
.type(notification.getType())
55-
.category(notification.getCategory())
56-
.teamId(notification.getTeamId())
57-
.createdAt(notification.getCreatedAt().toString())
58-
.message(notification.getMessage())
59-
.isRead(notification.isRead())
60-
.build();
61-
} else {
62-
payload = EventPayload.builder()
63-
.userId(userId)
64-
.type(notification.getType())
65-
.category(notification.getCategory())
66-
.createdAt(notification.getCreatedAt().toString())
67-
.message(notification.getMessage())
68-
.isRead(notification.isRead())
69-
.build();
70-
}
71-
String jsonPayload = objectMapper.writeValueAsString(payload);
72-
73-
// ✅ 올바른 서버의 큐로 메시지 전송
74-
log.info("🚀 RabbitMQ 메시지 발행 → 서버: {} | User: {} | 알림 ID: {}", targetServerId, userId, notification.getId());
75-
rabbitTemplate.convertAndSend(EXCHANGE_NAME, targetServerId, jsonPayload);
76-
} catch (Exception e) {
77-
log.error("❌ RabbitMQ 메시지 변환 실패: {}", e.getMessage(), e);
78-
}
79-
});
8029
}
81-
}
30+
}

src/main/java/com/project/Teaming/global/sse/repository/EmitterRepository.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.project.Teaming.global.error.ErrorCode;
88
import com.project.Teaming.global.error.exception.BusinessException;
99
import lombok.RequiredArgsConstructor;
10+
import org.springframework.beans.factory.annotation.Value;
1011
import org.springframework.data.redis.core.StringRedisTemplate;
1112
import org.springframework.stereotype.Repository;
1213
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -15,23 +16,17 @@
1516
@RequiredArgsConstructor
1617
public class EmitterRepository {
1718
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
18-
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60 ;
19-
private final StringRedisTemplate stringRedisTemplate;
20-
private static final String SSE_EMITTER_KEY = "sse_server:";
21-
private final String serverId = System.getenv("SERVER_ID");
2219

2320
public SseEmitter findById(Long userId) {
2421
return emitters.get(userId);
2522
}
2623

2724
public SseEmitter save(Long userId, SseEmitter sseEmitter) {
2825
emitters.put(userId, sseEmitter);
29-
stringRedisTemplate.opsForValue().set(SSE_EMITTER_KEY + userId, serverId, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
3026
return emitters.get(userId);
3127
}
3228

3329
public void deleteById(Long userId) {
3430
emitters.remove(userId);
35-
stringRedisTemplate.delete(SSE_EMITTER_KEY + userId);
3631
}
3732
}

0 commit comments

Comments
 (0)