diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java new file mode 100644 index 00000000..d457b57e --- /dev/null +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/WorkspaceToKafka.java @@ -0,0 +1,12 @@ +package com.jootalkpia.chat_server.dto; + +import java.time.LocalDateTime; + +public record WorkspaceToKafka( + Long workspaceId, + Long createUserId, + Long channelId, + String channelName, + LocalDateTime createdAt +) { +} diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java index b0b76bd0..677fed9d 100644 --- a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.jootalkpia.chat_server.dto.*; +import com.jootalkpia.chat_server.dto.WorkspaceToKafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; @@ -43,7 +44,7 @@ public void processMinutePrice(String kafkaMessage) { concurrency = "2" ) public void processChatMessage(@Header(KafkaHeaders.RECEIVED_KEY) String channelId, String kafkaMessage) { - log.info("Received Kafka message ===> channelId: {}, message: {}", channelId, kafkaMessage); + log.info("Received Kafka chat message ===> channelId: {}, message: {}", channelId, kafkaMessage); try { ChatMessageToKafka chatMessage = objectMapper.readValue(kafkaMessage, ChatMessageToKafka.class); String chatDataJson = objectMapper.writeValueAsString(chatMessage); @@ -107,18 +108,18 @@ public void processHurdleStatusMessage(String kafkaMessage) { groupId = "${group.workspace}" ) public void processChannelStatusMessage(String kafkaMessage) { - log.info("message ===> " + kafkaMessage); - - ObjectMapper mapper = new ObjectMapper(); - + log.info("Received Kafka workspace message ===> {}", kafkaMessage); try { - ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); //추후 DTO 변경 필수 + WorkspaceToKafka workspaceData = objectMapper.readValue(kafkaMessage, WorkspaceToKafka.class); + String workspaceDataJson = objectMapper.writeValueAsString(workspaceData); + Long workspaceId = workspaceData.workspaceId(); - // 채널 생성 및 상태 정보 데이터 전달하는 로직 + messagingTemplate.convertAndSend("/subscribe/workspace." + workspaceId, workspaceDataJson); + + log.info("Broadcasted workspace data via WebSocket: " + workspaceDataJson); - log.info("dto ===> " + chatMessageToKafka.toString()); } catch (Exception ex) { - log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리 + log.error("Error processing stock message: " + ex.getMessage(), ex); } } } diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/controller/WorkSpaceController.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/controller/WorkSpaceController.java index 63c5320a..1f8eee89 100644 --- a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/controller/WorkSpaceController.java +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/controller/WorkSpaceController.java @@ -41,7 +41,7 @@ public ResponseEntity createChannel(@PathVariable Long workspaceI ValidationUtils.validateWorkSpaceId(workspaceId); log.info("Creating channels for workspace with id: {}", workspaceId); - SimpleChannel channel = workSpaceService.createChannel(workspaceId, channelName); + SimpleChannel channel = workSpaceService.createChannel(workspaceId, channelName,userId); // 유저를 생성된 채널에 가입시킴 workSpaceService.addMember(workspaceId, userId, channel.getChannelId()); diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java new file mode 100644 index 00000000..b3faaf74 --- /dev/null +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/dto/WorkspaceToKafka.java @@ -0,0 +1,12 @@ +package com.jootalkpia.workspace_server.dto; + +import java.time.LocalDateTime; + +public record WorkspaceToKafka( + Long workspaceId, + Long createUserId, + Long channelId, + String channelName, + LocalDateTime createdAt +) { +} diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java index 3651ef92..091328ee 100644 --- a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java @@ -1,6 +1,7 @@ package com.jootalkpia.workspace_server.service; import com.fasterxml.jackson.databind.ObjectMapper; +import com.jootalkpia.workspace_server.dto.WorkspaceToKafka; import com.jootalkpia.workspace_server.entity.MessageToKafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -15,10 +16,10 @@ public class KafkaProducer { private final ObjectMapper objectMapper; private final KafkaTemplate kafkaTemplate; - public void sendChannelStatusMessage(MessageToKafka messageToKafka) { // DTO 변경 필수 + public void sendChannelStatusMessage(WorkspaceToKafka workspaceToKafka) { try { - String jsonMessage = objectMapper.writeValueAsString(messageToKafka); - kafkaTemplate.send("jootalkpia.workspace.prd.channel", jsonMessage) + String workspaceData = objectMapper.writeValueAsString(workspaceToKafka); + kafkaTemplate.send("jootalkpia.workspace.prd.channel", workspaceData) .whenComplete((result, ex) -> { //키 값 설정으로 순서 보장, 실시간성이 떨어짐, 고민해봐야 할 부분 if (ex == null) { log.info("Kafka message sent: {}", result.toString()); diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java index b9d4ee04..da28a1aa 100644 --- a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/WorkSpaceService.java @@ -2,6 +2,7 @@ import com.jootalkpia.workspace_server.dto.ChannelListDTO; import com.jootalkpia.workspace_server.dto.SimpleChannel; +import com.jootalkpia.workspace_server.dto.WorkspaceToKafka; import com.jootalkpia.workspace_server.entity.Channels; import com.jootalkpia.workspace_server.entity.ChatMessage; import com.jootalkpia.workspace_server.entity.UserChannel; @@ -36,6 +37,7 @@ public class WorkSpaceService { private final WorkSpaceRepository workSpaceRepository; private final UserRepository userRepository; private final RedisTemplate redisTemplate; + private final KafkaProducer kafkaProducer; public ChannelListDTO getChannels(Long userId, Long workspaceId) { // workspaceId로 모든 채널 조회 @@ -106,7 +108,7 @@ private ChannelListDTO createChannelListDTO(List joinedChannels, return channelListDTO; } - public SimpleChannel createChannel(Long workspaceId, String channelName) { + public SimpleChannel createChannel(Long workspaceId, String channelName,Long userId) { // WorkSpace 객체 조회 WorkSpace workSpace = fetchWorkSpace(workspaceId); @@ -121,6 +123,13 @@ public SimpleChannel createChannel(Long workspaceId, String channelName) { channelRepository.save(channel); + kafkaProducer.sendChannelStatusMessage( + new WorkspaceToKafka(workspaceId, + userId, + channel.getChannelId(), + channel.getName() + ,channel.getCreatedAt())); + return new SimpleChannel(channel.getChannelId(), channel.getName(), channel.getCreatedAt(),0L); }