-
Notifications
You must be signed in to change notification settings - Fork 3
채팅서버 카프카 연결 #164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
채팅서버 카프카 연결 #164
Changes from 11 commits
aec9023
56b73f1
b0ba370
ac73672
2fea881
3476bc0
44ab1cf
055e73e
bf377d1
87e7f82
d449b71
5c40767
5779b9f
ad5d6d6
9f8094c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,24 +1,26 @@ | ||
| package com.jootalkpia.chat_server.controller; | ||
|
|
||
| import com.jootalkpia.chat_server.dto.ChatMessageRequest; | ||
| import com.jootalkpia.chat_server.dto.ChatMessageResponse; | ||
| import com.jootalkpia.chat_server.dto.messgaeDto.ChatMessageRequest; | ||
| import com.jootalkpia.chat_server.dto.ChatMessageToKafka; | ||
| import com.jootalkpia.chat_server.service.ChatService; | ||
| import com.jootalkpia.chat_server.service.KafkaProducer; | ||
| import java.util.List; | ||
| import lombok.RequiredArgsConstructor; | ||
| import org.springframework.messaging.handler.annotation.DestinationVariable; | ||
| import org.springframework.messaging.handler.annotation.MessageMapping; | ||
| import org.springframework.messaging.handler.annotation.SendTo; | ||
| import org.springframework.web.bind.annotation.RestController; | ||
|
|
||
| @RestController | ||
| @RequiredArgsConstructor | ||
| public class ChatController { | ||
|
|
||
| private final KafkaProducer kafkaProducer; | ||
| private final ChatService chatService; | ||
|
|
||
| @MessageMapping("/chat.{channelId}") | ||
| @SendTo("/subscribe/chat.{channelId}") | ||
| public ChatMessageResponse sendMessage(ChatMessageRequest request, @DestinationVariable Long channelId) { | ||
| return chatService.createMessage(request.userId(), request.content()); | ||
| public void sendMessage(ChatMessageRequest request, @DestinationVariable Long channelId) { | ||
| List chatMessage = chatService.createMessage(request); //Type 건들지말것 | ||
| ChatMessageToKafka chatMessageToKafka = new ChatMessageToKafka(channelId, chatMessage); | ||
| kafkaProducer.sendChatMessage(chatMessageToKafka, channelId); | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| package com.jootalkpia.chat_server.domain; | ||
|
|
||
| import jakarta.persistence.Entity; | ||
| import jakarta.persistence.GeneratedValue; | ||
| import jakarta.persistence.GenerationType; | ||
| import java.time.LocalDateTime; | ||
| import lombok.Getter; | ||
| import lombok.Setter; | ||
| import jakarta.persistence.Id; | ||
|
|
||
| @Entity | ||
| @Getter | ||
| @Setter | ||
| public class Files { | ||
|
||
| @Id | ||
| @GeneratedValue(strategy = GenerationType.IDENTITY) | ||
| private Long fileId; | ||
|
|
||
| private String url; | ||
| private String urlThumbnail; | ||
| private String fileType; | ||
| private Long fileSize; | ||
| private String mimeType; | ||
| private LocalDateTime createdAt; | ||
| private LocalDateTime updatedAt; | ||
| } | ||
This file was deleted.
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,9 @@ | ||
| package com.jootalkpia.chat_server.dto; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| public record ChatMessageToKafka( | ||
| Long userId, | ||
| String username, | ||
| String content | ||
| Long channelId, | ||
| List chatMessage //type 건들지말것 | ||
| ) { | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| package com.jootalkpia.chat_server.dto.messgaeDto; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| public record ChatMessageRequest( | ||
| Long userId, | ||
| String content, | ||
| List<Long> attachmentList | ||
| ) { | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| package com.jootalkpia.chat_server.dto.messgaeDto; | ||
|
|
||
| public record CommonResponse( | ||
| Long userId, | ||
| String userNickname, | ||
| String userProfileImage | ||
| ) implements MessageResponse { | ||
| @Override | ||
| public String type() { | ||
| return "COMMON"; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| package com.jootalkpia.chat_server.dto.messgaeDto; | ||
|
|
||
| public record ImageResponse( | ||
| String url | ||
| ) implements MessageResponse { | ||
| @Override | ||
| public String type() { | ||
| return "IMAGE"; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| package com.jootalkpia.chat_server.dto.messgaeDto; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonSubTypes; | ||
| import com.fasterxml.jackson.annotation.JsonTypeInfo; | ||
|
|
||
| @JsonTypeInfo( | ||
| use = JsonTypeInfo.Id.NAME, | ||
| include = JsonTypeInfo.As.PROPERTY, | ||
| property = "type" | ||
| ) | ||
| @JsonSubTypes({ | ||
| @JsonSubTypes.Type(value = CommonResponse.class, name = "COMMON"), | ||
| @JsonSubTypes.Type(value = TextResponse.class, name = "TEXT"), | ||
| @JsonSubTypes.Type(value = VideoResponse.class, name = "VIDEO"), | ||
| @JsonSubTypes.Type(value = ImageResponse.class, name = "IMAGE") | ||
| }) | ||
| public interface MessageResponse { | ||
| String type(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| package com.jootalkpia.chat_server.dto.messgaeDto; | ||
|
|
||
| public record TextResponse( | ||
| String content | ||
| ) implements MessageResponse { | ||
| @Override | ||
| public String type() { | ||
| return "TEXT"; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package com.jootalkpia.chat_server.dto.messgaeDto; | ||
|
|
||
| public record VideoResponse( | ||
| String urlThumbnail, | ||
| String url | ||
| ) implements MessageResponse { | ||
| @Override | ||
| public String type() { | ||
| return "VIDEO"; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| package com.jootalkpia.chat_server.repository; | ||
|
|
||
| import com.jootalkpia.chat_server.domain.Files; | ||
| import org.springframework.data.jpa.repository.JpaRepository; | ||
| import org.springframework.stereotype.Repository; | ||
|
|
||
| @Repository | ||
| public interface FileRepository extends JpaRepository<Files, Long> { | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,17 @@ | ||
| package com.jootalkpia.chat_server.service; | ||
|
|
||
| import com.jootalkpia.chat_server.domain.Files; | ||
| import com.jootalkpia.chat_server.domain.User; | ||
| import com.jootalkpia.chat_server.dto.ChatMessageResponse; | ||
| import com.jootalkpia.chat_server.dto.messgaeDto.ChatMessageRequest; | ||
| import com.jootalkpia.chat_server.dto.messgaeDto.CommonResponse; | ||
| import com.jootalkpia.chat_server.dto.messgaeDto.ImageResponse; | ||
| import com.jootalkpia.chat_server.dto.messgaeDto.MessageResponse; | ||
| import com.jootalkpia.chat_server.dto.messgaeDto.TextResponse; | ||
| import com.jootalkpia.chat_server.dto.messgaeDto.VideoResponse; | ||
| import com.jootalkpia.chat_server.repository.FileRepository; | ||
| import com.jootalkpia.chat_server.repository.UserRepository; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import lombok.RequiredArgsConstructor; | ||
| import org.springframework.stereotype.Service; | ||
|
|
||
|
|
@@ -11,9 +20,32 @@ | |
| public class ChatService { | ||
|
|
||
| private final UserRepository userRepository; | ||
| private final FileRepository fileRepository; | ||
|
|
||
| public ChatMessageResponse createMessage(Long userId, String content){ | ||
| User user = userRepository.findByUserId(userId); | ||
| return new ChatMessageResponse(user.getNickname(),content); | ||
| public List<MessageResponse> createMessage(ChatMessageRequest request) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 함수의 분리를 고민해보셔도 좋을 것 같아요!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 해당 부분은 추후 리팩토링 진행하겠습니다! |
||
| List<MessageResponse> response = new ArrayList<>(); | ||
|
|
||
| User user = userRepository.findByUserId(request.userId()); | ||
| response.add(new CommonResponse(user.getUserId(), user.getNickname(), user.getProfileImage())); | ||
|
|
||
| String content = request.content(); | ||
| if (content != null && !content.isEmpty()) { | ||
| response.add(new TextResponse(content)); | ||
| } | ||
|
|
||
| List<Long> attachmentList = request.attachmentList(); | ||
| if (attachmentList != null && !attachmentList.isEmpty()) { | ||
| for (Long fileId : attachmentList) { | ||
| Files file = fileRepository.findById(fileId) | ||
| .orElseThrow(() -> new IllegalArgumentException("File not found for fileId: " + fileId)); // todo : 예외 처리 추가 | ||
| switch (file.getFileType()) { | ||
| case "IMAGE" -> response.add(new ImageResponse(file.getUrl())); | ||
| case "VIDEO" -> response.add(new VideoResponse(file.getUrlThumbnail(),file.getUrl())); | ||
| default -> throw new IllegalArgumentException("Unsupported file type: " + file.getFileType()); // todo : 예외 처리 추가 | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return response; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,13 @@ | ||
| package com.jootalkpia.chat_server.service; | ||
|
|
||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.jootalkpia.chat_server.dto.ChatMessageResponse; | ||
| import com.jootalkpia.chat_server.dto.ChatMessageToKafka; | ||
| import com.jootalkpia.chat_server.dto.MinutePriceResponse; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.kafka.annotation.KafkaListener; | ||
| import org.springframework.messaging.simp.SimpMessageSendingOperations; | ||
| import org.springframework.kafka.support.KafkaHeaders; | ||
| import org.springframework.messaging.handler.annotation.Header; | ||
| import org.springframework.messaging.simp.SimpMessagingTemplate; | ||
| import org.springframework.stereotype.Service; | ||
|
|
||
|
|
@@ -16,22 +16,20 @@ | |
| @RequiredArgsConstructor | ||
| public class KafkaConsumer { | ||
|
|
||
| private final ChatService chatService; | ||
| private final ObjectMapper objectMapper; | ||
| private final SimpMessagingTemplate messagingTemplate; // SimpMessagingTemplate 주입 | ||
| private final SimpMessageSendingOperations messagingTemplateBroker; // 내부 메시지 브로커 사용 | ||
| private final SimpMessagingTemplate messagingTemplate; | ||
|
|
||
| @KafkaListener( | ||
| topics = "${topic.minute}", | ||
| groupId = "${group.minute}" | ||
| ) | ||
| public void processMinutePrice(String kafkaMessage) { | ||
| log.info("Received Kafka message ===> " + kafkaMessage); | ||
| log.info("Received Kafka minute message ===> {}", kafkaMessage); | ||
| try { | ||
| MinutePriceResponse stockUpdate = objectMapper.readValue(kafkaMessage, MinutePriceResponse.class); | ||
| String stockDataJson = objectMapper.writeValueAsString(stockUpdate); | ||
|
|
||
| messagingTemplateBroker.convertAndSend("/subscribe/stock", stockDataJson); | ||
| messagingTemplate.convertAndSend("/subscribe/stock", stockDataJson); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. String으로 변경할 필요가 있을까요? stockUpdate 객체를 보내도 JSON 변환이 안되나요?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 확인해보니 SimpMessagingTemplate내 convertAndSend 메서드 체인에서 doConvert 메서드가 호출되며, 이 과정에서 MessageConverter를 사용해 payload를 Message 객체로 변환되기도 한다네요! 하지만 저희같은 경우엔 ChatMessageToKafka내에 List로 되어있는데 MessageResponse가 인터페이스다보니 Jackson이 처리를 잘 못하는 경우가 발생합니다. 그래서 명시적으로 json 객체로 변환하는 것이 안전할 것 같습니다
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 명시적으로 Json 형태의 문자열로 만든다는 의미신거죠? |
||
|
|
||
| log.info("Broadcasted stock data via WebSocket: " + stockDataJson); | ||
|
|
||
|
|
@@ -41,23 +39,22 @@ public void processMinutePrice(String kafkaMessage) { | |
| } | ||
|
|
||
| @KafkaListener( | ||
| topics = "${topic.chat}", | ||
| topics = "jootalkpia.chat.prd.message", | ||
|
||
| groupId = "${group.chat}", //추후 그룹 ID에 동적인 컨테이너 ID 삽입 | ||
| concurrency = "2" | ||
| ) | ||
| public void processChatMessage(String kafkaMessage) { | ||
| log.info("Received Kafka message ===> " + kafkaMessage); | ||
|
|
||
| ObjectMapper mapper = new ObjectMapper(); | ||
|
|
||
| public void processChatMessage(@Header(KafkaHeaders.RECEIVED_KEY) String channelId, String kafkaMessage) { | ||
| log.info("Received Kafka message ===> channelId: {}, message: {}", channelId, kafkaMessage); | ||
| try { | ||
| ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); | ||
| ChatMessageToKafka chatMessage = objectMapper.readValue(kafkaMessage, ChatMessageToKafka.class); | ||
| String chatDataJson = objectMapper.writeValueAsString(chatMessage); | ||
ki-met-hoon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| //로컬 메모리와 유저 ID를 비교하는 로직, 있으면 웹소켓을 통한 데이터 전달 없으면 일단 버림 | ||
| // todo : 로컬 메모리와 유저 ID를 비교하는 로직 추가 필요 | ||
| messagingTemplate.convertAndSend("/subscribe/chat." + channelId, chatDataJson); | ||
| log.info("Broadcasted chat message via WebSocket: {}", chatDataJson); | ||
|
|
||
| log.info("dto ===> " + chatMessageToKafka.toString()); | ||
| } catch (Exception ex) { | ||
| log.error(ex.getMessage(), ex); | ||
| log.error("Error processing chat message: {}", ex.getMessage(), ex); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yml 파일 설정 추가 또는 @value를 쓰는 것도 방법이라고 생각합니다!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
해당 부분은 대댓글 진행하며 prefix 추후 리팩토링 시 반영하겠습니다!