Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ public class KafkaConsumer {

@KafkaListener(
topics = "${topic.minute}",
groupId = "${group.minute}"
groupId = "${group.minute}",
concurrency = "2"
)
public void processMinutePrice(String kafkaMessage) {
log.info("Received Kafka minute message ===> {}", kafkaMessage);

try {
MinutePriceResponse stockUpdate = objectMapper.readValue(kafkaMessage, MinutePriceResponse.class);
String stockDataJson = objectMapper.writeValueAsString(stockUpdate);
Expand All @@ -45,14 +45,12 @@ public void processMinutePrice(String kafkaMessage) {
)
public void processChatMessage(@Header(KafkaHeaders.RECEIVED_KEY) String channelId, String kafkaMessage) {
log.info("Received Kafka message ===> channelId: {}, message: {}", channelId, kafkaMessage);

try {
ChatMessageToKafka chatMessage = objectMapper.readValue(kafkaMessage, ChatMessageToKafka.class);
String chatDataJson = objectMapper.writeValueAsString(chatMessage);

messagingTemplate.convertAndSend("/subscribe/chat." + channelId, chatDataJson);
log.info("Broadcasted chat message via WebSocket: {}", chatDataJson);
log.info("/subscribe/chat." + channelId);

} catch (Exception ex) {
log.error("Error processing chat message: {}", ex.getMessage(), ex);
Expand Down Expand Up @@ -84,4 +82,44 @@ public void processPushMessage(String kafkaMessage) {
log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리
}
}

@KafkaListener(
topics = "${topic.hurdle}",
groupId = "${group.hurdle}"
)
public void processHurdleStatusMessage(String kafkaMessage) {
log.info("message ===> " + kafkaMessage);

ObjectMapper mapper = new ObjectMapper();

try {
ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); //추후 DTO 변경 필수

// 클라이언트에게 허들 관련 데이터 전달하는 로직

log.info("dto ===> " + chatMessageToKafka.toString());
} catch (Exception ex) {
log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리
}
}

@KafkaListener(
topics = "${topic.workspace}",
groupId = "${group.workspace}"
)
public void processChannelStatusMessage(String kafkaMessage) {
log.info("message ===> " + kafkaMessage);

ObjectMapper mapper = new ObjectMapper();

try {
ChatMessageToKafka chatMessageToKafka = mapper.readValue(kafkaMessage, ChatMessageToKafka.class); //추후 DTO 변경 필수

// 채널 생성 및 상태 정보 데이터 전달하는 로직

log.info("dto ===> " + chatMessageToKafka.toString());
} catch (Exception ex) {
log.error(ex.getMessage(), ex); // 추후에 GlobalException 처리
}
}
}
6 changes: 5 additions & 1 deletion src/backend/chat_server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ topic:
minute: jootalkpia.stock.prd.minute
chat: jootalkpia.chat.prd.message
push: jootalkpia.push.prd.message
hurdle: jootalkpia.hurdle.prd.status
workspace: jootalkpia.workspace.prd.channel

group:
minute: minute-price-save-consumer-group
minute: minute-price-save-consumer-group-${SERVER_PORT}
chat: chat-message-handle-consumer-group-${SERVER_PORT}
push: push-message-handle-consumer-group-${SERVER_PORT}
hurdle: hurdle-status-handle-consumer-group-${SERVER_PORT}
workspace: workspace-channel-handle-consumer-group-${SERVER_PORT}

server:
port: ${SERVER_PORT}
2 changes: 1 addition & 1 deletion src/backend/push_server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ repositories {

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
Expand Down
3 changes: 3 additions & 0 deletions src/backend/signaling_server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ dependencies {
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

implementation 'org.springframework.boot:spring-boot-starter-data-redis'

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}

tasks.named('test') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.jootalkpia.signaling_server.model;

public record MessageToKafka() {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.jootalkpia.signaling_server.service;

import com.google.gson.Gson;
import com.jootalkpia.signaling_server.model.MessageToKafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final Gson gson = new Gson();
private final KafkaTemplate<String, String> kafkaTemplate;

public void sendHurdleStatusMessage(MessageToKafka messageToKafka) { // 미르님 원하는 DTO로 변경 필수
String jsonHurdleStatusMessage = gson.toJson(messageToKafka);

kafkaTemplate.send("jootalkpia.hurdle.prd.status", jsonHurdleStatusMessage).whenComplete((result, ex) -> {
if (ex == null) {
log.info(result.toString());
} else {
log.error(ex.getMessage(), ex); //추후 예외처리
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ spring:
broker:
enabled: true
destinationPrefixes: /topic, /queue
kafka:
producer:
acks: all # 멱등성 프로듀서를 위해 all로 설정, 0 또는 1이면 enable.idempotence=true 불가
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers: ${KAFKA_SERVER}

kurento:
ws:
Expand Down
Binary file not shown.
3 changes: 3 additions & 0 deletions src/backend/workspace_server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework.kafka:spring-kafka'
implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'org.postgresql:postgresql:42.7.4'
annotationProcessor 'org.projectlombok:lombok'
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
testImplementation 'org.springframework.kafka:spring-kafka-test'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.jootalkpia.workspace_server.entity;

public record MessageToKafka() {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.jootalkpia.workspace_server.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.jootalkpia.workspace_server.entity.MessageToKafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {

private final ObjectMapper objectMapper;
private final KafkaTemplate<String, String> kafkaTemplate;

public void sendChannelStatusMessage(MessageToKafka messageToKafka) { // DTO 변경 필수
try {
String jsonMessage = objectMapper.writeValueAsString(messageToKafka);
kafkaTemplate.send("jootalkpia.workspace.prd.channel", jsonMessage)
.whenComplete((result, ex) -> { //키 값 설정으로 순서 보장, 실시간성이 떨어짐, 고민해봐야 할 부분
if (ex == null) {
log.info("Kafka message sent: {}", result.toString());
} else {
log.error("Error sending Kafka message: {}", ex.getMessage(), ex);
}
});
} catch (Exception e) {
log.error("Error serializing chat message: {}", e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ spring:
org.hibernate.type.descriptor.sql: TRACE # SQL 매개변수 출력
com.jootalkpia.workspace_server.service: DEBUG # workspace 서비스 패키지 로그
org.springframework.data.redis: DEBUG # Redis 작업 로그
kafka:
bootstrap-servers: ${KAFKA_SERVER}
producer:
acks: all # 멱등성 프로듀서를 위해 all로 설정, 0 또는 1이면 enable.idempotence=true 불가
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
management:
endpoints:
web:
Expand Down