diff --git a/src/backend/chat_server/build.gradle b/src/backend/chat_server/build.gradle index a736a632..a8539891 100644 --- a/src/backend/chat_server/build.gradle +++ b/src/backend/chat_server/build.gradle @@ -27,7 +27,6 @@ dependencies { implementation group: 'org.postgresql', name: 'postgresql', version: '42.7.3' implementation 'org.springframework.boot:spring-boot-starter-data-jpa' - // Lombok compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' @@ -43,6 +42,13 @@ dependencies { //socket implementation 'org.springframework.boot:spring-boot-starter-websocket' + + //Kafka + implementation 'org.springframework.kafka:spring-kafka' + testImplementation 'org.springframework.kafka:spring-kafka-test' + + //Gson + implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.6' } tasks.named('test') { diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/ChatMessageToKafka.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/ChatMessageToKafka.java new file mode 100644 index 00000000..4d0b0795 --- /dev/null +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/ChatMessageToKafka.java @@ -0,0 +1,8 @@ +package com.jootalkpia.chat_server.dto; + +public record ChatMessageToKafka( + Long userId, + String username, + String content +) { +} diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/MinutePriceResponse.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/MinutePriceResponse.java new file mode 100644 index 00000000..108e6213 --- /dev/null +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/dto/MinutePriceResponse.java @@ -0,0 +1,27 @@ +package com.jootalkpia.chat_server.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public record MinutePriceResponse( + @JsonProperty("code") + String code, + @JsonProperty("htsKorIsnm") + String stockName, + @JsonProperty("stckBsopDate") + String businessDate, + @JsonProperty("stckCntgHour") + String tradingTime, + @JsonProperty("stckPrpr") + String currentPrice, + @JsonProperty("stckOprc") + String openPrice, + @JsonProperty("stckHgpr") + String highPrice, + @JsonProperty("stckLwpr") + String lowPrice, + @JsonProperty("cntgVol") + String tradingVolume, + @JsonProperty("acmlTrPbmn") + String totalTradeAmount +) { +} \ No newline at end of file diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java new file mode 100644 index 00000000..cfd384ef --- /dev/null +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaConsumer.java @@ -0,0 +1,53 @@ +package com.jootalkpia.chat_server.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jootalkpia.chat_server.dto.ChatMessageToKafka; +import com.jootalkpia.chat_server.dto.MinutePriceResponse; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class KafkaConsumer { + @KafkaListener( + topics = "jootalkpia.stock.local.minute", + groupId = "minute-price-save-consumer-group" + ) + public void processMinutePrice(String kafkaMessage) { + log.info("message ===> " + kafkaMessage); + + ObjectMapper mapper = new ObjectMapper(); + + try { + MinutePriceResponse minutePriceResponse = mapper.readValue(kafkaMessage, MinutePriceResponse.class); + + //웹소켓 전달하는 로직 or 전달하는 함수 + + log.info("dto ===> " + minutePriceResponse.toString()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리 + } + } + + @KafkaListener( + topics = "jootalkpia.chat.local.message", + groupId = "chat-message-handle-consumer-group", //추후 그룹 ID에 동적인 컨테이너 ID 삽입 + concurrency = "2" + ) + public void processChatMessage(String kafkaMessage) { + log.info("message ===> " + kafkaMessage); + + ObjectMapper mapper = new ObjectMapper(); + + try { + ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); + + //로컬 메모리와 유저 ID를 비교하는 로직, 있으면 웹소켓을 통한 데이터 전달 없으면 일단 버림 + + log.info("dto ===> " + chatMessageToKafka.toString()); + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + } + } +} diff --git a/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaProducer.java b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaProducer.java new file mode 100644 index 00000000..7e672224 --- /dev/null +++ b/src/backend/chat_server/src/main/java/com/jootalkpia/chat_server/service/KafkaProducer.java @@ -0,0 +1,28 @@ +package com.jootalkpia.chat_server.service; + +import com.google.gson.Gson; +import com.jootalkpia.chat_server.dto.ChatMessageToKafka; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +@RequiredArgsConstructor +public class KafkaProducer { + + private final Gson gson = new Gson(); + private final KafkaTemplate kafkaTemplate; + + public void sendChatMessage(ChatMessageToKafka chatMessageToKafka, Long roomId) { + String jsonChatMessage = gson.toJson(chatMessageToKafka); + kafkaTemplate.send("jootalkpia.chat.local.message", String.valueOf(roomId), jsonChatMessage).whenComplete((result, ex) -> { //키 값 설정으로 순서 보장, 실시간성이 떨어짐, 고민해봐야 할 부분 + if (ex == null) { + log.info(result.toString()); + } else { + log.error(ex.getMessage(), ex); //추후 예외처리 + } + }); + } +} diff --git a/src/backend/chat_server/src/main/resources/application.yml b/src/backend/chat_server/src/main/resources/application.yml index f2632b19..dcd7a5cf 100644 --- a/src/backend/chat_server/src/main/resources/application.yml +++ b/src/backend/chat_server/src/main/resources/application.yml @@ -7,6 +7,15 @@ spring: hibernate: format_sql: true show_sql: true + kafka: + bootstrap-servers: ${KAFKA_SERVER} + consumer: + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + producer: + acks: all # 멱등성 프로듀서를 위해 all로 설정, 0 또는 1이면 enable.idempotence=true 불가 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer datasource: driver-class-name: org.postgresql.Driver url: jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}