Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.jootalkpia.chat_server.dto;

import java.time.LocalDateTime;

public record WorkspaceToKafka(
Long workspaceId,
Long createUserId,
Long channelId,
String channelName,
LocalDateTime createdAt
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.jootalkpia.chat_server.dto.*;
import com.jootalkpia.chat_server.dto.WorkspaceToKafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
Expand Down Expand Up @@ -43,7 +44,7 @@ public void processMinutePrice(String kafkaMessage) {
concurrency = "2"
)
public void processChatMessage(@Header(KafkaHeaders.RECEIVED_KEY) String channelId, String kafkaMessage) {
log.info("Received Kafka message ===> channelId: {}, message: {}", channelId, kafkaMessage);
log.info("Received Kafka chat message ===> channelId: {}, message: {}", channelId, kafkaMessage);
try {
ChatMessageToKafka chatMessage = objectMapper.readValue(kafkaMessage, ChatMessageToKafka.class);
String chatDataJson = objectMapper.writeValueAsString(chatMessage);
Expand Down Expand Up @@ -107,18 +108,18 @@ public void processHurdleStatusMessage(String kafkaMessage) {
groupId = "${group.workspace}"
)
public void processChannelStatusMessage(String kafkaMessage) {
log.info("message ===> " + kafkaMessage);

ObjectMapper mapper = new ObjectMapper();

log.info("Received Kafka workspace message ===> {}", kafkaMessage);
try {
ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); //추후 DTO 변경 필수
WorkspaceToKafka workspaceData = objectMapper.readValue(kafkaMessage, WorkspaceToKafka.class);
String workspaceDataJson = objectMapper.writeValueAsString(workspaceData);
Long workspaceId = workspaceData.workspaceId();

// 채널 생성 및 상태 정보 데이터 전달하는 로직
messagingTemplate.convertAndSend("/subscribe/workspace." + workspaceId, workspaceDataJson);

log.info("Broadcasted workspace data via WebSocket: " + workspaceDataJson);

log.info("dto ===> " + chatMessageToKafka.toString());
} catch (Exception ex) {
log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리
log.error("Error processing stock message: " + ex.getMessage(), ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public ResponseEntity<SimpleChannel> createChannel(@PathVariable Long workspaceI
ValidationUtils.validateWorkSpaceId(workspaceId);
log.info("Creating channels for workspace with id: {}", workspaceId);

SimpleChannel channel = workSpaceService.createChannel(workspaceId, channelName);
SimpleChannel channel = workSpaceService.createChannel(workspaceId, channelName,userId);

// 유저를 생성된 채널에 가입시킴
workSpaceService.addMember(workspaceId, userId, channel.getChannelId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.jootalkpia.workspace_server.dto;

import java.time.LocalDateTime;

public record WorkspaceToKafka(
Long workspaceId,
Long createUserId,
Long channelId,
String channelName,
LocalDateTime createdAt
) {
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.jootalkpia.workspace_server.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.jootalkpia.workspace_server.dto.WorkspaceToKafka;
import com.jootalkpia.workspace_server.entity.MessageToKafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -15,10 +16,10 @@ public class KafkaProducer {
private final ObjectMapper objectMapper;
private final KafkaTemplate<String, String> kafkaTemplate;

public void sendChannelStatusMessage(MessageToKafka messageToKafka) { // DTO 변경 필수
public void sendChannelStatusMessage(WorkspaceToKafka workspaceToKafka) {
try {
String jsonMessage = objectMapper.writeValueAsString(messageToKafka);
kafkaTemplate.send("jootalkpia.workspace.prd.channel", jsonMessage)
String workspaceData = objectMapper.writeValueAsString(workspaceToKafka);
kafkaTemplate.send("jootalkpia.workspace.prd.channel", workspaceData)
.whenComplete((result, ex) -> { //키 값 설정으로 순서 보장, 실시간성이 떨어짐, 고민해봐야 할 부분
if (ex == null) {
log.info("Kafka message sent: {}", result.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.jootalkpia.workspace_server.dto.ChannelListDTO;
import com.jootalkpia.workspace_server.dto.SimpleChannel;
import com.jootalkpia.workspace_server.dto.WorkspaceToKafka;
import com.jootalkpia.workspace_server.entity.Channels;
import com.jootalkpia.workspace_server.entity.ChatMessage;
import com.jootalkpia.workspace_server.entity.UserChannel;
Expand Down Expand Up @@ -36,6 +37,7 @@ public class WorkSpaceService {
private final WorkSpaceRepository workSpaceRepository;
private final UserRepository userRepository;
private final RedisTemplate<String, String> redisTemplate;
private final KafkaProducer kafkaProducer;

public ChannelListDTO getChannels(Long userId, Long workspaceId) {
// workspaceId로 모든 채널 조회
Expand Down Expand Up @@ -106,7 +108,7 @@ private ChannelListDTO createChannelListDTO(List<SimpleChannel> joinedChannels,
return channelListDTO;
}

public SimpleChannel createChannel(Long workspaceId, String channelName) {
public SimpleChannel createChannel(Long workspaceId, String channelName,Long userId) {
// WorkSpace 객체 조회
WorkSpace workSpace = fetchWorkSpace(workspaceId);

Expand All @@ -121,6 +123,13 @@ public SimpleChannel createChannel(Long workspaceId, String channelName) {

channelRepository.save(channel);

kafkaProducer.sendChannelStatusMessage(
new WorkspaceToKafka(workspaceId,
userId,
channel.getChannelId(),
channel.getName()
,channel.getCreatedAt()));

return new SimpleChannel(channel.getChannelId(), channel.getName(), channel.getCreatedAt(),0L);
}

Expand Down