From 3cd0929b57adbaddff1c755ef494fa9a636ac6f9 Mon Sep 17 00:00:00 2001 From: bo-ram-bo-ram Date: Tue, 25 Feb 2025 13:51:24 +0900 Subject: [PATCH 1/6] =?UTF-8?q?=20#295=20feat(be)=20:=20=EC=9B=8C=ED=81=AC?= =?UTF-8?q?=EC=8A=A4=ED=8E=98=EC=9D=B4=EC=8A=A4=20=EB=82=B4=20produce=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=20=EA=B5=AC=ED=98=84=20-=20dto?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jootalkpia/workspace_server/dto/WorkspaceToKafka.java | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java new file mode 100644 index 00000000..e80ee783 --- /dev/null +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java @@ -0,0 +1,8 @@ +package com.jootalkpia.workspace_server.dto; + +public record WorkspaceToKafka( + Long workspaceId, + Long ChannelId, + String ChannelName +) { +} From 6409fa5f074ab487a1ce6c9335000ca26b4718ea Mon Sep 17 00:00:00 2001 From: bo-ram-bo-ram Date: Tue, 25 Feb 2025 13:51:53 +0900 Subject: [PATCH 2/6] =?UTF-8?q?=20#295=20feat(be)=20:=20=EC=9B=8C=ED=81=AC?= =?UTF-8?q?=EC=8A=A4=ED=8E=98=EC=9D=B4=EC=8A=A4=20=EB=82=B4=20produce=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=20=EA=B5=AC=ED=98=84=20-=20KafkaProducer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jootalkpia/workspace_server/service/KafkaProducer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java index 3651ef92..091328ee 100644 --- a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java @@ -1,6 +1,7 @@ package com.jootalkpia.workspace_server.service; import com.fasterxml.jackson.databind.ObjectMapper; +import com.jootalkpia.workspace_server.dto.WorkspaceToKafka; import com.jootalkpia.workspace_server.entity.MessageToKafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -15,10 +16,10 @@ public class KafkaProducer { private final ObjectMapper objectMapper; private final KafkaTemplate kafkaTemplate; - public void sendChannelStatusMessage(MessageToKafka messageToKafka) { // DTO 변경 필수 + public void sendChannelStatusMessage(WorkspaceToKafka workspaceToKafka) { try { - String jsonMessage = objectMapper.writeValueAsString(messageToKafka); - kafkaTemplate.send("jootalkpia.workspace.prd.channel", jsonMessage) + String workspaceData = objectMapper.writeValueAsString(workspaceToKafka); + kafkaTemplate.send("jootalkpia.workspace.prd.channel", workspaceData) .whenComplete((result, ex) -> { //키 값 설정으로 순서 보장, 실시간성이 떨어짐, 고민해봐야 할 부분 if (ex == null) { log.info("Kafka message sent: {}", result.toString()); From 035ed99797205858c801052e28fe64437969b0a3 Mon Sep 17 00:00:00 2001 From: bo-ram-bo-ram Date: Tue, 25 Feb 2025 13:52:07 +0900 Subject: [PATCH 3/6] =?UTF-8?q?=20#295=20feat(be)=20:=20=EC=9B=8C=ED=81=AC?= =?UTF-8?q?=EC=8A=A4=ED=8E=98=EC=9D=B4=EC=8A=A4=20=EB=82=B4=20produce=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=20=EA=B5=AC=ED=98=84=20-=20=EC=B1=84?= =?UTF-8?q?=EB=84=90=20=EC=83=9D=EC=84=B1=20=EB=A1=9C=EC=A7=81=EC=97=90=20?= =?UTF-8?q?=EC=A0=81=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workspace_server/service/WorkSpaceService.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java index b9d4ee04..0d72d43d 100644 --- a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java @@ -2,6 +2,7 @@ import com.jootalkpia.workspace_server.dto.ChannelListDTO; import com.jootalkpia.workspace_server.dto.SimpleChannel; +import com.jootalkpia.workspace_server.dto.WorkspaceToKafka; import com.jootalkpia.workspace_server.entity.Channels; import com.jootalkpia.workspace_server.entity.ChatMessage; import com.jootalkpia.workspace_server.entity.UserChannel; @@ -36,6 +37,7 @@ public class WorkSpaceService { private final WorkSpaceRepository workSpaceRepository; private final UserRepository userRepository; private final RedisTemplate redisTemplate; + private final KafkaProducer kafkaProducer; public ChannelListDTO getChannels(Long userId, Long workspaceId) { // workspaceId로 모든 채널 조회 @@ -121,6 +123,11 @@ public SimpleChannel createChannel(Long workspaceId, String channelName) { channelRepository.save(channel); + kafkaProducer.sendChannelStatusMessage( + new WorkspaceToKafka(workspaceId, + channel.getChannelId(), + channel.getName())); + return new SimpleChannel(channel.getChannelId(), channel.getName(), channel.getCreatedAt(),0L); } From 1fe91fe7623085a1db9e76f44b7e52cff179ac2c Mon Sep 17 00:00:00 2001 From: bo-ram-bo-ram Date: Tue, 25 Feb 2025 13:52:29 +0900 Subject: [PATCH 4/6] =?UTF-8?q?=20#295=20feat(be)=20:=20=EC=9B=8C=ED=81=AC?= =?UTF-8?q?=EC=8A=A4=ED=8E=98=EC=9D=B4=EC=8A=A4=20=EB=82=B4=20consume=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=20=EA=B5=AC=ED=98=84=20-=20dto?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jootalkpia/chat_server/dto/WorkspaceToKafka.java | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java new file mode 100644 index 00000000..0872b1f6 --- /dev/null +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java @@ -0,0 +1,8 @@ +package com.jootalkpia.chat_server.dto; + +public record WorkspaceToKafka( + Long workspaceId, + Long ChannelId, + String ChannelName +) { +} From 6796cc3a7ef65fc7a4935d1a0852766d1a1f2e87 Mon Sep 17 00:00:00 2001 From: bo-ram-bo-ram Date: Tue, 25 Feb 2025 13:52:50 +0900 Subject: [PATCH 5/6] =?UTF-8?q?=20#295=20feat(be)=20:=20=EC=9B=8C=ED=81=AC?= =?UTF-8?q?=EC=8A=A4=ED=8E=98=EC=9D=B4=EC=8A=A4=20=EB=82=B4=20consume=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=20=EA=B5=AC=ED=98=84=20-=20KafkaConsumer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chat_server/service/KafkaConsumer.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java index b0b76bd0..1a58e19d 100644 --- a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.jootalkpia.chat_server.dto.*; +import com.jootalkpia.chat_server.dto.WorkspaceToKafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; @@ -43,7 +44,7 @@ public void processMinutePrice(String kafkaMessage) { concurrency = "2" ) public void processChatMessage(@Header(KafkaHeaders.RECEIVED_KEY) String channelId, String kafkaMessage) { - log.info("Received Kafka message ===> channelId: {}, message: {}", channelId, kafkaMessage); + log.info("Received Kafka chat message ===> channelId: {}, message: {}", channelId, kafkaMessage); try { ChatMessageToKafka chatMessage = objectMapper.readValue(kafkaMessage, ChatMessageToKafka.class); String chatDataJson = objectMapper.writeValueAsString(chatMessage); @@ -107,18 +108,18 @@ public void processHurdleStatusMessage(String kafkaMessage) { groupId = "${group.workspace}" ) public void processChannelStatusMessage(String kafkaMessage) { - log.info("message ===> " + kafkaMessage); - - ObjectMapper mapper = new ObjectMapper(); - + log.info("Received Kafka workspace message ===> {}", kafkaMessage); try { - ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); //추후 DTO 변경 필수 + WorkspaceToKafka workspaceData = objectMapper.readValue(kafkaMessage, WorkspaceToKafka.class); + String workspaceDataJson = objectMapper.writeValueAsString(workspaceData); + Long workspaceId = workspaceData.workspaceId(); - // 채널 생성 및 상태 정보 데이터 전달하는 로직 + messagingTemplate.convertAndSend("/subscribe/workspace" + workspaceId, workspaceDataJson); + + log.info("Broadcasted workspace data via WebSocket: " + workspaceDataJson); - log.info("dto ===> " + chatMessageToKafka.toString()); } catch (Exception ex) { - log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리 + log.error("Error processing stock message: " + ex.getMessage(), ex); } } } From 48deb6148c6c88e915e7679fb42af872efe7c9e4 Mon Sep 17 00:00:00 2001 From: bo-ram-bo-ram Date: Tue, 25 Feb 2025 17:07:26 +0900 Subject: [PATCH 6/6] =?UTF-8?q?=20#295=20feat(be)=20:=20dto=EC=97=90=20cre?= =?UTF-8?q?ateUserId=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jootalkpia/chat_server/dto/WorkspaceToKafka.java | 8 ++++++-- .../com/jootalkpia/chat_server/service/KafkaConsumer.java | 2 +- .../workspace_server/controller/WorkSpaceController.java | 2 +- .../jootalkpia/workspace_server/dto/WorkspaceToKafka.java | 8 ++++++-- .../workspace_server/service/WorkSpaceService.java | 6 ++++-- 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java index 0872b1f6..d457b57e 100644 --- a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java @@ -1,8 +1,12 @@ package com.jootalkpia.chat_server.dto; +import java.time.LocalDateTime; + public record WorkspaceToKafka( Long workspaceId, - Long ChannelId, - String ChannelName + Long createUserId, + Long channelId, + String channelName, + LocalDateTime createdAt ) { } diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java index 1a58e19d..677fed9d 100644 --- a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java @@ -114,7 +114,7 @@ public void processChannelStatusMessage(String kafkaMessage) { String workspaceDataJson = objectMapper.writeValueAsString(workspaceData); Long workspaceId = workspaceData.workspaceId(); - messagingTemplate.convertAndSend("/subscribe/workspace" + workspaceId, workspaceDataJson); + messagingTemplate.convertAndSend("/subscribe/workspace." + workspaceId, workspaceDataJson); log.info("Broadcasted workspace data via WebSocket: " + workspaceDataJson); diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/controller/WorkSpaceController.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/controller/WorkSpaceController.java index 63c5320a..1f8eee89 100644 --- a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/controller/WorkSpaceController.java +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/controller/WorkSpaceController.java @@ -41,7 +41,7 @@ public ResponseEntity createChannel(@PathVariable Long workspaceI ValidationUtils.validateWorkSpaceId(workspaceId); log.info("Creating channels for workspace with id: {}", workspaceId); - SimpleChannel channel = workSpaceService.createChannel(workspaceId, channelName); + SimpleChannel channel = workSpaceService.createChannel(workspaceId, channelName,userId); // 유저를 생성된 채널에 가입시킴 workSpaceService.addMember(workspaceId, userId, channel.getChannelId()); diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java index e80ee783..b3faaf74 100644 --- a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java @@ -1,8 +1,12 @@ package com.jootalkpia.workspace_server.dto; +import java.time.LocalDateTime; + public record WorkspaceToKafka( Long workspaceId, - Long ChannelId, - String ChannelName + Long createUserId, + Long channelId, + String channelName, + LocalDateTime createdAt ) { } diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java index 0d72d43d..da28a1aa 100644 --- a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java @@ -108,7 +108,7 @@ private ChannelListDTO createChannelListDTO(List joinedChannels, return channelListDTO; } - public SimpleChannel createChannel(Long workspaceId, String channelName) { + public SimpleChannel createChannel(Long workspaceId, String channelName,Long userId) { // WorkSpace 객체 조회 WorkSpace workSpace = fetchWorkSpace(workspaceId); @@ -125,8 +125,10 @@ public SimpleChannel createChannel(Long workspaceId, String channelName) { kafkaProducer.sendChannelStatusMessage( new WorkspaceToKafka(workspaceId, + userId, channel.getChannelId(), - channel.getName())); + channel.getName() + ,channel.getCreatedAt())); return new SimpleChannel(channel.getChannelId(), channel.getName(), channel.getCreatedAt(),0L); }