Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.bbebig.chatserver.domain.chat.repository.SessionManager;
import com.bbebig.chatserver.domain.chat.service.KafkaProducerService;
import com.bbebig.chatserver.global.util.SequenceRedisGenerator;
import com.bbebig.commonmodule.global.response.code.error.ErrorStatus;
import com.bbebig.commonmodule.global.response.exception.ErrorHandler;
import com.bbebig.commonmodule.kafka.dto.ChannelEventDto;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.Valid;
Expand All @@ -26,12 +29,14 @@ public class ChannelEventController {

private final ObjectMapper objectMapper;

private final SequenceRedisGenerator sequenceRedisGenerator;

// 현재 보고있는 채널 변경 이벤트 처리
@MessageMapping("/channel/enter")
public void enterChannel(Message<?> message) throws IOException {
ChannelEventDto channelEventDto = extractChannelEventDto(message);
log.info("[Chat] ChannelEventController: 채널 입장 이벤트. memberId = {}, channelId = {}, sessionId = {}",
channelEventDto.getMemberId(), channelEventDto.getChannelId(), channelEventDto.getSessionId());
log.info("[Chat] ChannelEventController: 채널 입장 이벤트. memberId = {}, channelId = {}, sessionId = {}, channelEventDto = {}",
channelEventDto.getMemberId(), channelEventDto.getChannelId(), channelEventDto.getSessionId(), channelEventDto);
validateTimestamps(channelEventDto);
kafkaProducerService.sendMessageForChannel(channelEventDto);
}
Expand All @@ -40,9 +45,11 @@ public void enterChannel(Message<?> message) throws IOException {
@MessageMapping("/channel/leave")
public void leaveChannel(Message<?> message) throws IOException {
ChannelEventDto channelEventDto = extractChannelEventDto(message);
log.info("[Chat] ChannelEventController: 채널 퇴장 이벤트. memberId = {}, channelId = {}, sessionId = {}",
channelEventDto.getMemberId(), channelEventDto.getChannelId(), channelEventDto.getSessionId());
log.info("[Chat] ChannelEventController: 채널 퇴장 이벤트. memberId = {}, channelId = {}, sessionId = {}, channelEventDto = {}",
channelEventDto.getMemberId(), channelEventDto.getChannelId(), channelEventDto.getSessionId(), channelEventDto);
validateTimestamps(channelEventDto);

validateLastInfo(channelEventDto);
kafkaProducerService.sendMessageForChannel(channelEventDto);
}

Expand All @@ -54,6 +61,21 @@ private void validateTimestamps(ChannelEventDto channelEventDto) {
}
}

private void validateLastInfo(ChannelEventDto channelEventDto) {
if (channelEventDto.getLastReadMessageId() == null || channelEventDto.getLastReadSequence() == null) {
log.warn("[Chat] ChannelEventController: lastReadMessageId 또는 lastReadSequence 값이 null이므로 기본값으로 설정. memberId: {}, channelId: {}", channelEventDto.getMemberId(), channelEventDto.getChannelId());
Long seq = sequenceRedisGenerator.getSeqForServerChannel(channelEventDto.getChannelId());
if (seq != null) {
channelEventDto.setLastReadMessageId(seq);
channelEventDto.setLastReadSequence(seq);
} else {
log.error("[Chat] ChannelEventController: lastReadMessageId 또는 lastReadSequence 값이 null이고 Redis에서 시퀀스를 가져올 수 없음. memberId: {}, channelId: {}", channelEventDto.getMemberId(), channelEventDto.getChannelId());
}
channelEventDto.setLastReadMessageId(0L);
channelEventDto.setLastReadSequence(0L);
}
}

private ChannelEventDto extractChannelEventDto(Message<?> message) throws IOException {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.context.WebApplicationContext;

Expand Down Expand Up @@ -94,14 +93,15 @@ public void postSend(Message<?> message, MessageChannel channel, boolean sent) {

StompCommand command = headerAccessor.getCommand();

log.info("[Chat] StompHandler Post send: sessionId={}, command={}", sessionId, command);
log.info("[Chat] StompHandler Post send: sessionId={}, command={}, memberId={}", sessionId, command, sessionManager.findMemberIdBySessionId(sessionId));

if (command == null) {
log.error("[Chat] StompHandler: postSend - command 정보 없음. 전체 헤더 정보: {}", headerAccessor.toMap());
return;
}
if (StompCommand.CONNECT.equals(command)) {
Long memberId = Long.parseLong(Objects.requireNonNull(headerAccessor.getFirstNativeHeader("MemberId")));
log.info("[Chat] StompHandler Post send - CONNECT: sessionId={}, command={}, memberId={}", sessionId, command, memberId);
ConnectionEventDto connectionEventDto = ConnectionEventDto.builder()
.memberId(memberId)
.type(ConnectionEventType.CONNECT)
Expand All @@ -115,6 +115,7 @@ public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
.build();

kafkaProducerService.sendMessageForSession(connectionEventDto);
// sessionManager.saveConnectSessionInfo(sessionId, memberId);
} else if (StompCommand.DISCONNECT.equals(command)) {
if (sessionId == null) {
log.error("[Chat] StompHandler: DISCONNECT 요청 시 세션 ID 없음");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ public void configureMessageBroker(MessageBrokerRegistry registry) {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {

registry.addEndpoint("/ws-mobile") // 클라이언트가 연결할 엔드포인트 설정
.setAllowedOriginPatterns("*");

registry.addEndpoint("/ws-chat") // 클라이언트가 연결할 엔드포인트 설정
.setAllowedOrigins("*");
.setAllowedOriginPatterns("*");

// 웹소켓을 사용할 수 없는 환경에서 sockJS 지원
registry.addEndpoint("/ws-chat") // 클라이언트가 연결할 엔드포인트 설정
.setAllowedOrigins("*")
.setAllowedOriginPatterns("*")
.withSockJS();

registry.setErrorHandler(stompErrorHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public Long nextSeqForServerChannel(Long channelId) {
return redisSetTemplate.opsForValue().increment(redisKey);
}

public Long getSeqForServerChannel(Long channelId) {
String redisKey = ServerRedisKeys.getServerChannelSequenceKey(channelId);
return redisSetTemplate.opsForValue().get(redisKey);
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum ErrorStatus implements BaseErrorCode {
CHAT_ROOM_NOT_FOUND(HttpStatus.BAD_REQUEST, "CHAT401", "채팅방이 존재하지 않습니다."),
CHAT_ROOM_ALREADY_EXIST(HttpStatus.BAD_REQUEST, "CHAT402", "이미 존재하는 채팅방입니다."),
CHAT_ROOM_MEMBER_NOT_FOUND(HttpStatus.BAD_REQUEST, "CHAT403", "채팅방에 참여한 멤버가 존재하지 않습니다."),
CHANNEL_LEAVE_LAST_INFO_NOT_VALID(HttpStatus.BAD_REQUEST, "CHAT404", "채널 퇴장 시 마지막 정보가 유효하지 않습니다."),

// SERVICE
SERVER_NOT_FOUND(HttpStatus.NOT_FOUND, "SERVICE401", "서버를 찾을 수 없습니다."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ public class ChatMessageDto {
private List<Long> targetMemberIds; // 메시지 대상자 목록 (DM일 경우)

// CREATE 시 필요
@PastOrPresent(message = "생성 시간은 현재 시간 이전이어야 합니다.")
private LocalDateTime createdAt;

// UPDATE 시 필요
@PastOrPresent(message = "수정 시간은 현재 시간 이전이어야 합니다.")
private LocalDateTime updatedAt;

// @JsonCreator와 @JsonProperty를 사용한 생성자
Expand Down