Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ dependencies {
annotationProcessor 'jakarta.persistence:jakarta.persistence-api:3.1.0'
annotationProcessor 'jakarta.annotation:jakarta.annotation-api:2.1.1'

// kafak
implementation 'org.springframework.kafka:spring-kafka'

}

tasks.named('test') {
Expand Down
35 changes: 35 additions & 0 deletions docker-compose.local.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# 개발환경 도커 컴포즈

# 로컬 개발 환경을 위한 서비스 정의
services:
# 로컬 Redis
redis:
image: redis:latest
container_name: bubblog-local-redis # 운영 환경과의 충돌 방지를 위해 이름 명시
ports:
- "6379:6379"
volumes:
- redis_data_local:/data

# 로컬 Kafka
kafka:
image: 'bitnami/kafka:latest'
container_name: bubblog-local-kafka
ports:
- "9092:9092"
environment:
- KAFKA_CFG_NODE_ID=1 # Kafka 클러스터 내에서 각 노드를 식별하는 고유 ID
- KAFKA_CFG_PROCESS_ROLES=broker,controller # 이 Kafka 노드가 수행할 역할을 지정. 브로커(메세지 관리) 겸 컨트롤러(클러스터 관리)
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 컨트롤러(관리자)용 통신에 사용되는 리스너 채널 이름 지정
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT # 여러 broker 노드들끼리 서로 통신할 때 사용할 통신 채널의 이름
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 # 1@kafka:9093 : 'ID가 1이고 주소가 kafka:9093인 노드가 투표권을 가진다.'
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true # 존재하지 않는 토픽(Topic)으로 메시지를 보냈을 때 토픽을 자동으로 생성할지 여부
volumes:
- kafka_data_local:/bitnami/kafka
# 데이터 유지를 위한 로컬 볼륨
volumes:
redis_data_local:
kafka_data_local:
25 changes: 24 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,33 @@ services:
restart: always
depends_on:
- redis
- kafka
ports:
- "8080:8080"
env_file:
- .env

# 직접 Kafka 설치하지 않고 Docker를 이용해 컨테이너 환경으로 실행
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
restart: always
ports:
- "9092:9092"
environment:
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# Docker 내부 통신용 리스너와 외부 접속용 리스너를 함께 설정
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_EXTERNAL://${KAFKA_HOST_IP}:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
volumes:
- kafka_data:/bitnami/kafka

volumes:
redis_data:
redis_data:
kafka_data:
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import Bubble.bubblog.domain.tag.repository.TagRepository;
import Bubble.bubblog.domain.user.entity.User;
import Bubble.bubblog.domain.user.repository.UserRepository;
import Bubble.bubblog.global.dto.kafka.EmbeddingRequestKafkaDTO;
import Bubble.bubblog.global.dto.kafka.EmbeddingType;
import Bubble.bubblog.global.exception.CustomException;
import Bubble.bubblog.global.exception.ErrorCode;
import Bubble.bubblog.global.service.AiService;
import Bubble.bubblog.global.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand All @@ -44,9 +46,13 @@ public class BlogPostServiceImpl implements BlogPostService {
private final CategoryClosureRepository categoryClosureRepository;
private final TagRepository tagRepository;
private final PostTagRepository postTagRepository;
private final AiService aiService;
private final KafkaProducerService kafkaProducerService;
// private final AiService aiService; // 코드 리뷰 후 필요없다 판단되면 제거 예정 -> 기존 webclient 방식
private final CommentRepository commentRepository;

// 토픽 이름 상수 설정
private static final String KAFKA_TOPIC = "Embedding-Request-Topic";

@Transactional
@Override
public BlogPostDetailDTO createPost(BlogPostRequestDTO request, UUID userId) {
Expand Down Expand Up @@ -81,8 +87,17 @@ public BlogPostDetailDTO createPost(BlogPostRequestDTO request, UUID userId) {
}

// AI 서버에 임베딩 요청
aiService.handlePostTitle(post.getId(), post.getTitle());
aiService.handlePostContent(post.getId(), post.getContent());
// aiService.handlePostTitle(post.getId(), post.getTitle());
// aiService.handlePostContent(post.getId(), post.getContent());

/** Kafka Producer를 호출 */
// 제목과 내용 임베딩 요청 메세지 생성
EmbeddingRequestKafkaDTO titleRequest = new EmbeddingRequestKafkaDTO(post.getId(), post.getTitle(), EmbeddingType.TITLE);
EmbeddingRequestKafkaDTO contentRequest = new EmbeddingRequestKafkaDTO(post.getId(), post.getContent(), EmbeddingType.CONTENT);

// 메세지 발송
kafkaProducerService.sendMessage(KAFKA_TOPIC, titleRequest);
kafkaProducerService.sendMessage(KAFKA_TOPIC, contentRequest);

return new BlogPostDetailDTO(post, categoryList, tags);
}
Expand Down Expand Up @@ -203,10 +218,14 @@ public BlogPostDetailDTO updatePost(Long postId, BlogPostRequestDTO request, UUI

// 분기 처리
if (titleChanged) {
aiService.handlePostTitle(post.getId(), request.getTitle());
// aiService.handlePostTitle(post.getId(), request.getTitle());
EmbeddingRequestKafkaDTO titleRequest = new EmbeddingRequestKafkaDTO(post.getId(), request.getTitle(), EmbeddingType.TITLE);
kafkaProducerService.sendMessage(KAFKA_TOPIC, titleRequest);
}
if (contentChanged) {
aiService.handlePostContent(post.getId(), request.getContent());
// aiService.handlePostContent(post.getId(), request.getContent());
EmbeddingRequestKafkaDTO contentRequest = new EmbeddingRequestKafkaDTO(post.getId(), request.getContent(), EmbeddingType.CONTENT);
kafkaProducerService.sendMessage(KAFKA_TOPIC, contentRequest);
}

post.update(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package Bubble.bubblog.global.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

// 접속할 kafka 브로커의 주소 목록 (우리 프로젝트에서는 우선 하나)
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

// Kafka Producer 인스턴스를 생성하는 데 필요한 설정값들을 정의
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// Kafka 브로커의 주소를 설정
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 메시지의 key를 직렬화할 때 사용할 클래스를 지정
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 메시지의 value를 직렬화할 때 사용할 클래스를 지정. 여기서는 JSON 형태로 보낼 것이므로 JsonSerializer를 사용.
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

// Kafka 메시지를 보내는 데 사용될 KafkaTemplate을 Bean으로 등록.
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
.requestMatchers(HttpMethod.GET, "/api/users/{userId}").permitAll()
// 댓글 조회 관련 API 허용
.requestMatchers(HttpMethod.GET, "/api/comments/**").permitAll()
// 카프카 테스트 API
.requestMatchers(HttpMethod.POST, "/kafka/publish").permitAll()
// 그 외 요청은 인증 필요
.anyRequest().authenticated()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package Bubble.bubblog.global.controller;

import Bubble.bubblog.global.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/** 테스트 용으로 만든 컨트롤러 */

@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaTestController {
private final KafkaProducerService producerService;

// 테스트를 위해 임시로 만든 API
@PostMapping("/publish")
public String sendMessage(@RequestParam("message") String message) {

// 실제로는 DTO 객체를 만들어서 보내는 것이 좋습니다.
// 테스트를 위해 간단히 Map을 사용합니다.
Map<String, String> messageObject = new HashMap<>();
messageObject.put("content", message);

// "bubblog-topic" 이라는 토픽으로 메시지를 보냅니다.
producerService.sendMessage("bubblog-topic", messageObject);

return "Message sent to Kafka topic: bubblog-topic";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package Bubble.bubblog.global.dto.kafka;

public record EmbeddingRequestKafkaDTO(
Long postId,
String text, // 내용
EmbeddingType type // TITLE, CONTENT
) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package Bubble.bubblog.global.dto.kafka;

public enum EmbeddingType {
TITLE, CONTENT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package Bubble.bubblog.global.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerService {

private final KafkaTemplate<String, Object> kafkaTemplate;

// 메시지를 지정된 토픽으로 보내는 메서드
public void sendMessage(String topic, Object message) {
log.info("Sending message to Kafka topic: {}, message: {}", topic, message);
kafkaTemplate.send(topic, message);
}
}
12 changes: 9 additions & 3 deletions src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ spring:
show-sql: true
properties:
hibernate:
format_sql: true
format_sql: false
database: postgresql

data:
redis:
host: ${REDIS_HOST}
port: ${REDIS_PORT}
host: localhost
port: 6379

kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

cloud:
aws:
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/application-prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ spring:
host: ${REDIS_HOST}
port: ${REDIS_PORT}

kafka:
bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS} # Kafka 서버의 주소
producer: # 메시지를 보낼 때 사용할 규칙들
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

cloud:
aws:
credentials:
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ springdoc:

logging:
level:
root: DEBUG
org.springframework: DEBUG
root: INFO
org.springframework: WARN
org.hibernate.SQL: DEBUG