Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/backend/chat_server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ dependencies {
implementation group: 'org.postgresql', name: 'postgresql', version: '42.7.3'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'


// Lombok
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
Expand All @@ -43,6 +42,13 @@ dependencies {

//socket
implementation 'org.springframework.boot:spring-boot-starter-websocket'

//Kafka
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

//Gson
implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
}

tasks.named('test') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.jootalkpia.chat_server.dto;

public record ChatMessageToKafka(
Long userId,
String username,
String content
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.jootalkpia.chat_server.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

public record MinutePriceResponse(
@JsonProperty("code")
String code,
@JsonProperty("htsKorIsnm")
String stockName,
@JsonProperty("stckBsopDate")
String businessDate,
@JsonProperty("stckCntgHour")
String tradingTime,
@JsonProperty("stckPrpr")
String currentPrice,
@JsonProperty("stckOprc")
String openPrice,
@JsonProperty("stckHgpr")
String highPrice,
@JsonProperty("stckLwpr")
String lowPrice,
@JsonProperty("cntgVol")
String tradingVolume,
@JsonProperty("acmlTrPbmn")
String totalTradeAmount
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.jootalkpia.chat_server.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.jootalkpia.chat_server.dto.ChatMessageToKafka;
import com.jootalkpia.chat_server.dto.MinutePriceResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaConsumer {
@KafkaListener(
topics = "jootalkpia.stock.local.minute",
groupId = "minute-price-save-consumer-group"
)
public void processMinutePrice(String kafkaMessage) {
log.info("message ===> " + kafkaMessage);

ObjectMapper mapper = new ObjectMapper();

try {
MinutePriceResponse minutePriceResponse = mapper.readValue(kafkaMessage, MinutePriceResponse.class);

//웹소켓 전달하는 로직 or 전달하는 함수

log.info("dto ===> " + minutePriceResponse.toString());
} catch (Exception ex) {
log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리
}
}

@KafkaListener(
topics = "jootalkpia.chat.local.message",
groupId = "chat-message-handle-consumer-group", //추후 그룹 ID에 동적인 컨테이너 ID 삽입
concurrency = "2"
)
public void processChatMessage(String kafkaMessage) {
log.info("message ===> " + kafkaMessage);

ObjectMapper mapper = new ObjectMapper();

try {
ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class);

//로컬 메모리와 유저 ID를 비교하는 로직, 있으면 웹소켓을 통한 데이터 전달 없으면 일단 버림

log.info("dto ===> " + chatMessageToKafka.toString());
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.jootalkpia.chat_server.service;

import com.google.gson.Gson;
import com.jootalkpia.chat_server.dto.ChatMessageToKafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {

private final Gson gson = new Gson();
private final KafkaTemplate<String, String> kafkaTemplate;

public void sendChatMessage(ChatMessageToKafka chatMessageToKafka, Long roomId) {
String jsonChatMessage = gson.toJson(chatMessageToKafka);
kafkaTemplate.send("jootalkpia.chat.local.message", String.valueOf(roomId), jsonChatMessage).whenComplete((result, ex) -> { //키 값 설정으로 순서 보장, 실시간성이 떨어짐, 고민해봐야 할 부분
if (ex == null) {
log.info(result.toString());
} else {
log.error(ex.getMessage(), ex); //추후 예외처리
}
});
}
}
9 changes: 9 additions & 0 deletions src/backend/chat_server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ spring:
hibernate:
format_sql: true
show_sql: true
kafka:
bootstrap-servers: ${KAFKA_SERVER}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
acks: all # 멱등성 프로듀서를 위해 all로 설정, 0 또는 1이면 enable.idempotence=true 불가
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
datasource:
driver-class-name: org.postgresql.Driver
url: jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}
Expand Down