diff --git a/build.gradle b/build.gradle index 7a45437..fdb7e88 100644 --- a/build.gradle +++ b/build.gradle @@ -58,6 +58,9 @@ dependencies { annotationProcessor 'jakarta.persistence:jakarta.persistence-api:3.1.0' annotationProcessor 'jakarta.annotation:jakarta.annotation-api:2.1.1' + // kafak + implementation 'org.springframework.kafka:spring-kafka' + } tasks.named('test') { diff --git a/docker-compose.local.yml b/docker-compose.local.yml new file mode 100644 index 0000000..4624aca --- /dev/null +++ b/docker-compose.local.yml @@ -0,0 +1,35 @@ +# 개발환경 도커 컴포즈 + +# 로컬 개발 환경을 위한 서비스 정의 +services: + # 로컬 Redis + redis: + image: redis:latest + container_name: bubblog-local-redis # 운영 환경과의 충돌 방지를 위해 이름 명시 + ports: + - "6379:6379" + volumes: + - redis_data_local:/data + + # 로컬 Kafka + kafka: + image: 'bitnami/kafka:latest' + container_name: bubblog-local-kafka + ports: + - "9092:9092" + environment: + - KAFKA_CFG_NODE_ID=1 # Kafka 클러스터 내에서 각 노드를 식별하는 고유 ID + - KAFKA_CFG_PROCESS_ROLES=broker,controller # 이 Kafka 노드가 수행할 역할을 지정. 브로커(메세지 관리) 겸 컨트롤러(클러스터 관리) + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 컨트롤러(관리자)용 통신에 사용되는 리스너 채널 이름 지정 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT # 여러 broker 노드들끼리 서로 통신할 때 사용할 통신 채널의 이름 + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 # 1@kafka:9093 : 'ID가 1이고 주소가 kafka:9093인 노드가 투표권을 가진다.' + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true # 존재하지 않는 토픽(Topic)으로 메시지를 보냈을 때 토픽을 자동으로 생성할지 여부 + volumes: + - kafka_data_local:/bitnami/kafka +# 데이터 유지를 위한 로컬 볼륨 +volumes: + redis_data_local: + kafka_data_local: \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index ef3f6a4..64827bc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,10 +14,33 @@ services: restart: always depends_on: - redis + - kafka ports: - "8080:8080" env_file: - .env + # 직접 Kafka 설치하지 않고 Docker를 이용해 컨테이너 환경으로 실행 + kafka: + image: 'bitnami/kafka:latest' + container_name: kafka + restart: always + ports: + - "9092:9092" + environment: + - KAFKA_CFG_NODE_ID=1 + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + # Docker 내부 통신용 리스너와 외부 접속용 리스너를 함께 설정 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_EXTERNAL://${KAFKA_HOST_IP}:9092 + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true + volumes: + - kafka_data:/bitnami/kafka + volumes: - redis_data: \ No newline at end of file + redis_data: + kafka_data: \ No newline at end of file diff --git a/src/main/java/Bubble/bubblog/domain/post/service/BlogPostServiceImpl.java b/src/main/java/Bubble/bubblog/domain/post/service/BlogPostServiceImpl.java index 1cbc4cf..d17a65a 100644 --- a/src/main/java/Bubble/bubblog/domain/post/service/BlogPostServiceImpl.java +++ b/src/main/java/Bubble/bubblog/domain/post/service/BlogPostServiceImpl.java @@ -18,9 +18,11 @@ import Bubble.bubblog.domain.tag.repository.TagRepository; import Bubble.bubblog.domain.user.entity.User; import Bubble.bubblog.domain.user.repository.UserRepository; +import Bubble.bubblog.global.dto.kafka.EmbeddingRequestKafkaDTO; +import Bubble.bubblog.global.dto.kafka.EmbeddingType; import Bubble.bubblog.global.exception.CustomException; import Bubble.bubblog.global.exception.ErrorCode; -import Bubble.bubblog.global.service.AiService; +import Bubble.bubblog.global.service.KafkaProducerService; import lombok.RequiredArgsConstructor; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -44,9 +46,13 @@ public class BlogPostServiceImpl implements BlogPostService { private final CategoryClosureRepository categoryClosureRepository; private final TagRepository tagRepository; private final PostTagRepository postTagRepository; - private final AiService aiService; + private final KafkaProducerService kafkaProducerService; + // private final AiService aiService; // 코드 리뷰 후 필요없다 판단되면 제거 예정 -> 기존 webclient 방식 private final CommentRepository commentRepository; + // 토픽 이름 상수 설정 + private static final String KAFKA_TOPIC = "Embedding-Request-Topic"; + @Transactional @Override public BlogPostDetailDTO createPost(BlogPostRequestDTO request, UUID userId) { @@ -81,8 +87,17 @@ public BlogPostDetailDTO createPost(BlogPostRequestDTO request, UUID userId) { } // AI 서버에 임베딩 요청 - aiService.handlePostTitle(post.getId(), post.getTitle()); - aiService.handlePostContent(post.getId(), post.getContent()); + // aiService.handlePostTitle(post.getId(), post.getTitle()); + // aiService.handlePostContent(post.getId(), post.getContent()); + + /** Kafka Producer를 호출 */ + // 제목과 내용 임베딩 요청 메세지 생성 + EmbeddingRequestKafkaDTO titleRequest = new EmbeddingRequestKafkaDTO(post.getId(), post.getTitle(), EmbeddingType.TITLE); + EmbeddingRequestKafkaDTO contentRequest = new EmbeddingRequestKafkaDTO(post.getId(), post.getContent(), EmbeddingType.CONTENT); + + // 메세지 발송 + kafkaProducerService.sendMessage(KAFKA_TOPIC, titleRequest); + kafkaProducerService.sendMessage(KAFKA_TOPIC, contentRequest); return new BlogPostDetailDTO(post, categoryList, tags); } @@ -203,10 +218,14 @@ public BlogPostDetailDTO updatePost(Long postId, BlogPostRequestDTO request, UUI // 분기 처리 if (titleChanged) { - aiService.handlePostTitle(post.getId(), request.getTitle()); + // aiService.handlePostTitle(post.getId(), request.getTitle()); + EmbeddingRequestKafkaDTO titleRequest = new EmbeddingRequestKafkaDTO(post.getId(), request.getTitle(), EmbeddingType.TITLE); + kafkaProducerService.sendMessage(KAFKA_TOPIC, titleRequest); } if (contentChanged) { - aiService.handlePostContent(post.getId(), request.getContent()); + // aiService.handlePostContent(post.getId(), request.getContent()); + EmbeddingRequestKafkaDTO contentRequest = new EmbeddingRequestKafkaDTO(post.getId(), request.getContent(), EmbeddingType.CONTENT); + kafkaProducerService.sendMessage(KAFKA_TOPIC, contentRequest); } post.update( diff --git a/src/main/java/Bubble/bubblog/global/config/KafkaProducerConfig.java b/src/main/java/Bubble/bubblog/global/config/KafkaProducerConfig.java new file mode 100644 index 0000000..712e9fc --- /dev/null +++ b/src/main/java/Bubble/bubblog/global/config/KafkaProducerConfig.java @@ -0,0 +1,41 @@ +package Bubble.bubblog.global.config; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaProducerConfig { + + // 접속할 kafka 브로커의 주소 목록 (우리 프로젝트에서는 우선 하나) + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + // Kafka Producer 인스턴스를 생성하는 데 필요한 설정값들을 정의 + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + // Kafka 브로커의 주소를 설정 + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + // 메시지의 key를 직렬화할 때 사용할 클래스를 지정 + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + // 메시지의 value를 직렬화할 때 사용할 클래스를 지정. 여기서는 JSON 형태로 보낼 것이므로 JsonSerializer를 사용. + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + // Kafka 메시지를 보내는 데 사용될 KafkaTemplate을 Bean으로 등록. + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} \ No newline at end of file diff --git a/src/main/java/Bubble/bubblog/global/config/SecurityConfig.java b/src/main/java/Bubble/bubblog/global/config/SecurityConfig.java index ec7d049..eceac28 100644 --- a/src/main/java/Bubble/bubblog/global/config/SecurityConfig.java +++ b/src/main/java/Bubble/bubblog/global/config/SecurityConfig.java @@ -52,6 +52,8 @@ public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { .requestMatchers(HttpMethod.GET, "/api/users/{userId}").permitAll() // 댓글 조회 관련 API 허용 .requestMatchers(HttpMethod.GET, "/api/comments/**").permitAll() + // 카프카 테스트 API + .requestMatchers(HttpMethod.POST, "/kafka/publish").permitAll() // 그 외 요청은 인증 필요 .anyRequest().authenticated() ) diff --git a/src/main/java/Bubble/bubblog/global/controller/KafkaTestController.java b/src/main/java/Bubble/bubblog/global/controller/KafkaTestController.java new file mode 100644 index 0000000..ee73cb7 --- /dev/null +++ b/src/main/java/Bubble/bubblog/global/controller/KafkaTestController.java @@ -0,0 +1,35 @@ +package Bubble.bubblog.global.controller; + +import Bubble.bubblog.global.service.KafkaProducerService; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; + +/** 테스트 용으로 만든 컨트롤러 */ + +@RestController +@RequestMapping("/kafka") +@RequiredArgsConstructor +public class KafkaTestController { + private final KafkaProducerService producerService; + + // 테스트를 위해 임시로 만든 API + @PostMapping("/publish") + public String sendMessage(@RequestParam("message") String message) { + + // 실제로는 DTO 객체를 만들어서 보내는 것이 좋습니다. + // 테스트를 위해 간단히 Map을 사용합니다. + Map messageObject = new HashMap<>(); + messageObject.put("content", message); + + // "bubblog-topic" 이라는 토픽으로 메시지를 보냅니다. + producerService.sendMessage("bubblog-topic", messageObject); + + return "Message sent to Kafka topic: bubblog-topic"; + } +} diff --git a/src/main/java/Bubble/bubblog/global/dto/kafka/EmbeddingRequestKafkaDTO.java b/src/main/java/Bubble/bubblog/global/dto/kafka/EmbeddingRequestKafkaDTO.java new file mode 100644 index 0000000..bc760ea --- /dev/null +++ b/src/main/java/Bubble/bubblog/global/dto/kafka/EmbeddingRequestKafkaDTO.java @@ -0,0 +1,7 @@ +package Bubble.bubblog.global.dto.kafka; + +public record EmbeddingRequestKafkaDTO( + Long postId, + String text, // 내용 + EmbeddingType type // TITLE, CONTENT +) {} diff --git a/src/main/java/Bubble/bubblog/global/dto/kafka/EmbeddingType.java b/src/main/java/Bubble/bubblog/global/dto/kafka/EmbeddingType.java new file mode 100644 index 0000000..751d560 --- /dev/null +++ b/src/main/java/Bubble/bubblog/global/dto/kafka/EmbeddingType.java @@ -0,0 +1,5 @@ +package Bubble.bubblog.global.dto.kafka; + +public enum EmbeddingType { + TITLE, CONTENT +} diff --git a/src/main/java/Bubble/bubblog/global/service/KafkaProducerService.java b/src/main/java/Bubble/bubblog/global/service/KafkaProducerService.java new file mode 100644 index 0000000..eb94d84 --- /dev/null +++ b/src/main/java/Bubble/bubblog/global/service/KafkaProducerService.java @@ -0,0 +1,20 @@ +package Bubble.bubblog.global.service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + // 메시지를 지정된 토픽으로 보내는 메서드 + public void sendMessage(String topic, Object message) { + log.info("Sending message to Kafka topic: {}, message: {}", topic, message); + kafkaTemplate.send(topic, message); + } +} diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index e70be43..d6a26ab 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -20,13 +20,19 @@ spring: show-sql: true properties: hibernate: - format_sql: true + format_sql: false database: postgresql data: redis: - host: ${REDIS_HOST} - port: ${REDIS_PORT} + host: localhost + port: 6379 + + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer cloud: aws: diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 89172c3..b03e59e 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -24,6 +24,12 @@ spring: host: ${REDIS_HOST} port: ${REDIS_PORT} + kafka: + bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS} # Kafka 서버의 주소 + producer: # 메시지를 보낼 때 사용할 규칙들 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + cloud: aws: credentials: diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3535d46..9ae1a3b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -16,7 +16,7 @@ springdoc: logging: level: - root: DEBUG - org.springframework: DEBUG + root: INFO + org.springframework: WARN org.hibernate.SQL: DEBUG