diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java index d5b30b39..a7f1006c 100644 --- a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java @@ -20,11 +20,11 @@ public class KafkaConsumer { @KafkaListener( topics = "${topic.minute}", - groupId = "${group.minute}" + groupId = "${group.minute}", + concurrency = "2" ) public void processMinutePrice(String kafkaMessage) { log.info("Received Kafka minute message ===> {}", kafkaMessage); - try { MinutePriceResponse stockUpdate = objectMapper.readValue(kafkaMessage, MinutePriceResponse.class); String stockDataJson = objectMapper.writeValueAsString(stockUpdate); @@ -45,14 +45,12 @@ public void processMinutePrice(String kafkaMessage) { ) public void processChatMessage(@Header(KafkaHeaders.RECEIVED_KEY) String channelId, String kafkaMessage) { log.info("Received Kafka message ===> channelId: {}, message: {}", channelId, kafkaMessage); - try { ChatMessageToKafka chatMessage = objectMapper.readValue(kafkaMessage, ChatMessageToKafka.class); String chatDataJson = objectMapper.writeValueAsString(chatMessage); messagingTemplate.convertAndSend("/subscribe/chat." + channelId, chatDataJson); log.info("Broadcasted chat message via WebSocket: {}", chatDataJson); - log.info("/subscribe/chat." + channelId); } catch (Exception ex) { log.error("Error processing chat message: {}", ex.getMessage(), ex); @@ -84,4 +82,44 @@ public void processPushMessage(String kafkaMessage) { log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리 } } + + @KafkaListener( + topics = "${topic.hurdle}", + groupId = "${group.hurdle}" + ) + public void processHurdleStatusMessage(String kafkaMessage) { + log.info("message ===> " + kafkaMessage); + + ObjectMapper mapper = new ObjectMapper(); + + try { + ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); //추후 DTO 변경 필수 + + // 클라이언트에게 허들 관련 데이터 전달하는 로직 + + log.info("dto ===> " + chatMessageToKafka.toString()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리 + } + } + + @KafkaListener( + topics = "${topic.workspace}", + groupId = "${group.workspace}" + ) + public void processChannelStatusMessage(String kafkaMessage) { + log.info("message ===> " + kafkaMessage); + + ObjectMapper mapper = new ObjectMapper(); + + try { + ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); //추후 DTO 변경 필수 + + // 채널 생성 및 상태 정보 데이터 전달하는 로직 + + log.info("dto ===> " + chatMessageToKafka.toString()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리 + } + } } diff --git a/src/backend/chat_server/src/main/resources/application.yml b/src/backend/chat_server/src/main/resources/application.yml index 3ee1c91a..f8351d8a 100644 --- a/src/backend/chat_server/src/main/resources/application.yml +++ b/src/backend/chat_server/src/main/resources/application.yml @@ -38,11 +38,15 @@ topic: minute: jootalkpia.stock.prd.minute chat: jootalkpia.chat.prd.message push: jootalkpia.push.prd.message + hurdle: jootalkpia.hurdle.prd.status + workspace: jootalkpia.workspace.prd.channel group: - minute: minute-price-save-consumer-group + minute: minute-price-save-consumer-group-${SERVER_PORT} chat: chat-message-handle-consumer-group-${SERVER_PORT} push: push-message-handle-consumer-group-${SERVER_PORT} + hurdle: hurdle-status-handle-consumer-group-${SERVER_PORT} + workspace: workspace-channel-handle-consumer-group-${SERVER_PORT} server: port: ${SERVER_PORT} diff --git a/src/backend/push_server/build.gradle b/src/backend/push_server/build.gradle index 99b624c5..80490cce 100644 --- a/src/backend/push_server/build.gradle +++ b/src/backend/push_server/build.gradle @@ -25,10 +25,10 @@ repositories { dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' - implementation 'org.springframework.kafka:spring-kafka' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' + implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } diff --git a/src/backend/signaling_server/build.gradle b/src/backend/signaling_server/build.gradle index 7f1ea3a6..c47086fb 100644 --- a/src/backend/signaling_server/build.gradle +++ b/src/backend/signaling_server/build.gradle @@ -50,6 +50,9 @@ dependencies { testRuntimeOnly 'org.junit.platform:junit-platform-launcher' implementation 'org.springframework.boot:spring-boot-starter-data-redis' + + implementation 'org.springframework.kafka:spring-kafka' + testImplementation 'org.springframework.kafka:spring-kafka-test' } tasks.named('test') { diff --git a/src/backend/signaling_server/src/main/java/com/jootalkpia/signaling_server/model/MessageToKafka.java b/src/backend/signaling_server/src/main/java/com/jootalkpia/signaling_server/model/MessageToKafka.java new file mode 100644 index 00000000..fd04b96b --- /dev/null +++ b/src/backend/signaling_server/src/main/java/com/jootalkpia/signaling_server/model/MessageToKafka.java @@ -0,0 +1,4 @@ +package com.jootalkpia.signaling_server.model; + +public record MessageToKafka() { +} diff --git a/src/backend/signaling_server/src/main/java/com/jootalkpia/signaling_server/service/KafkaProducer.java b/src/backend/signaling_server/src/main/java/com/jootalkpia/signaling_server/service/KafkaProducer.java new file mode 100644 index 00000000..77283f2f --- /dev/null +++ b/src/backend/signaling_server/src/main/java/com/jootalkpia/signaling_server/service/KafkaProducer.java @@ -0,0 +1,28 @@ +package com.jootalkpia.signaling_server.service; + +import com.google.gson.Gson; +import com.jootalkpia.signaling_server.model.MessageToKafka; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +@RequiredArgsConstructor +public class KafkaProducer { + private final Gson gson = new Gson(); + private final KafkaTemplate kafkaTemplate; + + public void sendHurdleStatusMessage(MessageToKafka messageToKafka) { // 미르님 원하는 DTO로 변경 필수 + String jsonHurdleStatusMessage = gson.toJson(messageToKafka); + + kafkaTemplate.send("jootalkpia.hurdle.prd.status", jsonHurdleStatusMessage).whenComplete((result, ex) -> { + if (ex == null) { + log.info(result.toString()); + } else { + log.error(ex.getMessage(), ex); //추후 예외처리 + } + }); + } +} diff --git a/src/backend/signaling_server/src/main/resources/application.yml b/src/backend/signaling_server/src/main/resources/application.yml index b4ce1e98..8880cd0e 100644 --- a/src/backend/signaling_server/src/main/resources/application.yml +++ b/src/backend/signaling_server/src/main/resources/application.yml @@ -13,6 +13,12 @@ spring: broker: enabled: true destinationPrefixes: /topic, /queue + kafka: + producer: + acks: all # 멱등성 프로듀서를 위해 all로 설정, 0 또는 1이면 enable.idempotence=true 불가 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + bootstrap-servers: ${KAFKA_SERVER} kurento: ws: diff --git a/src/backend/workspace_server/.gradle/8.11.1/fileHashes/fileHashes.lock b/src/backend/workspace_server/.gradle/8.11.1/fileHashes/fileHashes.lock index f64ef8be..ab9fedee 100644 Binary files a/src/backend/workspace_server/.gradle/8.11.1/fileHashes/fileHashes.lock and b/src/backend/workspace_server/.gradle/8.11.1/fileHashes/fileHashes.lock differ diff --git a/src/backend/workspace_server/build.gradle b/src/backend/workspace_server/build.gradle index e8e63926..097cfa29 100644 --- a/src/backend/workspace_server/build.gradle +++ b/src/backend/workspace_server/build.gradle @@ -28,10 +28,13 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-data-redis' + implementation 'org.springframework.kafka:spring-kafka' + implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.6' compileOnly 'org.projectlombok:lombok' runtimeOnly 'org.postgresql:postgresql:42.7.4' annotationProcessor 'org.projectlombok:lombok' annotationProcessor "org.springframework.boot:spring-boot-configuration-processor" + testImplementation 'org.springframework.kafka:spring-kafka-test' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.kafka:spring-kafka-test' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/entity/MessageToKafka.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/entity/MessageToKafka.java new file mode 100644 index 00000000..fa21b246 --- /dev/null +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/entity/MessageToKafka.java @@ -0,0 +1,4 @@ +package com.jootalkpia.workspace_server.entity; + +public record MessageToKafka() { +} diff --git a/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java new file mode 100644 index 00000000..3651ef92 --- /dev/null +++ b/src/backend/workspace_server/src/main/java/com/jootalkpia/workspace_server/service/KafkaProducer.java @@ -0,0 +1,33 @@ +package com.jootalkpia.workspace_server.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jootalkpia.workspace_server.entity.MessageToKafka; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +@RequiredArgsConstructor +public class KafkaProducer { + + private final ObjectMapper objectMapper; + private final KafkaTemplate kafkaTemplate; + + public void sendChannelStatusMessage(MessageToKafka messageToKafka) { // DTO 변경 필수 + try { + String jsonMessage = objectMapper.writeValueAsString(messageToKafka); + kafkaTemplate.send("jootalkpia.workspace.prd.channel", jsonMessage) + .whenComplete((result, ex) -> { //키 값 설정으로 순서 보장, 실시간성이 떨어짐, 고민해봐야 할 부분 + if (ex == null) { + log.info("Kafka message sent: {}", result.toString()); + } else { + log.error("Error sending Kafka message: {}", ex.getMessage(), ex); + } + }); + } catch (Exception e) { + log.error("Error serializing chat message: {}", e.getMessage(), e); + } + } +} diff --git a/src/backend/workspace_server/src/main/resources/application.yml b/src/backend/workspace_server/src/main/resources/application.yml index 7a6c4a25..88ff8a15 100644 --- a/src/backend/workspace_server/src/main/resources/application.yml +++ b/src/backend/workspace_server/src/main/resources/application.yml @@ -25,6 +25,12 @@ spring: org.hibernate.type.descriptor.sql: TRACE # SQL 매개변수 출력 com.jootalkpia.workspace_server.service: DEBUG # workspace 서비스 패키지 로그 org.springframework.data.redis: DEBUG # Redis 작업 로그 + kafka: + bootstrap-servers: ${KAFKA_SERVER} + producer: + acks: all # 멱등성 프로듀서를 위해 all로 설정, 0 또는 1이면 enable.idempotence=true 불가 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer management: endpoints: web: