Skip to content

Commit 6276479

Browse files
committed
kafka
1 parent fb3c42a commit 6276479

6 files changed

Lines changed: 82 additions & 28 deletions

File tree

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.gearfirst.warehouse.api.dto;
2+
3+
import lombok.Builder;
4+
import lombok.Data;
5+
6+
@Data
7+
@Builder
8+
public class NotificationDto {
9+
private Long id;
10+
private String eventId;
11+
private String type;
12+
private String message;
13+
private String receiver;
14+
private boolean read = false;
15+
}

src/main/java/com/gearfirst/warehouse/api/dto/TestDto.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/main/java/com/gearfirst/warehouse/api/receiving/service/ReceivingServiceImpl.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.gearfirst.warehouse.api.receiving.service;
22

3+
import com.gearfirst.warehouse.api.dto.NotificationDto;
34
import com.gearfirst.warehouse.api.inventory.service.InventoryService;
45
import com.gearfirst.warehouse.api.parts.persistence.PartJpaRepository;
56
import com.gearfirst.warehouse.api.receiving.domain.ReceivingLineStatus;
@@ -23,13 +24,10 @@
2324
import com.gearfirst.warehouse.common.util.DateTimes;
2425
import java.time.OffsetDateTime;
2526
import java.time.ZoneOffset;
26-
import java.util.ArrayList;
27-
import java.util.Comparator;
28-
import java.util.HashSet;
29-
import java.util.List;
30-
import java.util.Objects;
31-
import java.util.Set;
27+
import java.util.*;
28+
3229
import lombok.RequiredArgsConstructor;
30+
import org.springframework.kafka.core.KafkaTemplate;
3331
import org.springframework.stereotype.Service;
3432

3533
@Service
@@ -43,6 +41,7 @@ public class ReceivingServiceImpl implements ReceivingService {
4341
private final NoteNumberGenerator noteNumberGenerator;
4442
private final InventoryService inventoryService;
4543
private final PartJpaRepository partRepository;
44+
private final KafkaTemplate<String, Object> kafkaTemplate;
4645

4746
@Override
4847
public List<ReceivingNoteSummaryResponse> getNotDone(String date) {
@@ -209,6 +208,17 @@ public ReceivingCompleteResponse complete(Long noteId, ReceivingCompleteRequest
209208
note.setCompletedAt(completedAt);
210209
repository.save(note);
211210

211+
String topic = "notification";
212+
NotificationDto n = NotificationDto.builder()
213+
.id(1L)
214+
.eventId(UUID.randomUUID().toString())
215+
.type("입고 요청 완료")
216+
.message("입고 요청이 완료되었습니다.")
217+
.receiver("본사")
218+
.build();
219+
220+
kafkaTemplate.send(topic, n);
221+
212222
return new ReceivingCompleteResponse(
213223
DateTimes.toKstString(completedAt),
214224
appliedSum
@@ -326,6 +336,18 @@ public ReceivingNoteDetailResponse create(ReceivingCreateNoteRequest request) {
326336
entity.addLine(le);
327337
}
328338
var saved = repository.save(entity);
339+
340+
String topic = "notification";
341+
NotificationDto n = NotificationDto.builder()
342+
.id(1L)
343+
.eventId(UUID.randomUUID().toString())
344+
.type("입고 요청 등록")
345+
.message("입고 요청이 등록되었습니다.")
346+
.receiver("본사")
347+
.build();
348+
349+
kafkaTemplate.send(topic, n);
350+
329351
return toDetail(saved);
330352
}
331353

src/main/java/com/gearfirst/warehouse/api/shipping/service/ShippingServiceImpl.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static com.gearfirst.warehouse.common.response.ErrorStatus.CONFLICT_NOTE_STATUS_WHILE_COMPLETE;
44

5+
import com.gearfirst.warehouse.api.dto.NotificationDto;
56
import com.gearfirst.warehouse.api.inventory.dto.OnHandDtos.OnHandSummary;
67
import com.gearfirst.warehouse.api.inventory.service.InventoryService;
78
import com.gearfirst.warehouse.api.parts.persistence.PartJpaRepository;
@@ -33,13 +34,11 @@
3334
import com.gearfirst.warehouse.common.util.DateTimes;
3435
import java.time.OffsetDateTime;
3536
import java.time.ZoneOffset;
36-
import java.util.ArrayList;
37-
import java.util.Comparator;
38-
import java.util.HashSet;
39-
import java.util.List;
40-
import java.util.Set;
37+
import java.util.*;
38+
4139
import lombok.RequiredArgsConstructor;
4240
import org.springframework.beans.factory.annotation.Autowired;
41+
import org.springframework.kafka.core.KafkaTemplate;
4342
import org.springframework.stereotype.Service;
4443
import org.springframework.transaction.annotation.Transactional;
4544

@@ -53,6 +52,7 @@ public class ShippingServiceImpl implements ShippingService {
5352
private final NoteNumberGenerator noteNumberGenerator;
5453
// Optional helper for product snapshot (nullable for tests)
5554
private final PartJpaRepository partRepository;
55+
private final KafkaTemplate<String, Object> kafkaTemplate;
5656

5757
// Optional Querydsl repository for unified list queries (nullable for tests)
5858
@Autowired(required = false)
@@ -398,6 +398,17 @@ public ShippingCompleteResponse complete(Long noteId, ShippingCompleteRequest re
398398
.build();
399399
repository.save(updated);
400400

401+
String topic = "notification";
402+
NotificationDto n = NotificationDto.builder()
403+
.id(1L)
404+
.eventId(UUID.randomUUID().toString())
405+
.type("부품 출고 완료")
406+
.message("부품 출고 요청이 완료되었습니다.")
407+
.receiver("본사")
408+
.build();
409+
410+
kafkaTemplate.send(topic, n);
411+
401412
return new ShippingCompleteResponse(completedAt, totalShipped);
402413
}
403414

@@ -519,6 +530,18 @@ public ShippingNoteDetailResponse create(ShippingCreateNoteRequest request) {
519530
.lines(lines)
520531
.build();
521532
var saved = repository.save(note);
533+
534+
String topic = "notification";
535+
NotificationDto n = NotificationDto.builder()
536+
.id(1L)
537+
.eventId(UUID.randomUUID().toString())
538+
.type("출고 요청 등록")
539+
.message("출고 요청이 등록되었습니다.")
540+
.receiver("본사")
541+
.build();
542+
543+
kafkaTemplate.send(topic, n);
544+
522545
return toDetail(saved);
523546
}
524547

src/main/java/com/gearfirst/warehouse/common/config/kafka/KafkaConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
public class KafkaConfig {
1313

1414
/**
15-
* (4) DLQ (Dead Letter Queue)를 위한 에러 핸들러를 Bean으로 등록합니다. 이 핸들러는 메시지 처리에 실패하면, 지정된 횟수(3번)만큼 재시도하고, 그래도 실패하면 해당 메시지를
16-
* DLQ(Dead Letter Topic)로 자동 전송합니다.
15+
* (4) DLQ (Dead Letter Queue)를 위한 에러 핸들러를 Bean으로 등록합니다.
16+
* 이 핸들러는 메시지 처리에 실패하면, 지정된 횟수(3번)만큼 재시도하고,
17+
* 그래도 실패하면 해당 메시지를 DLQ(Dead Letter Topic)로 자동 전송합니다.
1718
*/
1819
@Bean
1920
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) {
@@ -38,4 +39,4 @@ public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template)
3839
factory.setCommonErrorHandler(errorHandler);
3940
return factory;
4041
}
41-
}
42+
}

src/main/java/com/gearfirst/warehouse/common/config/kafka/KafkaConsumerConfig.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.gearfirst.warehouse.common.config.kafka;
22

3-
import com.gearfirst.warehouse.api.dto.TestDto;
4-
import java.util.Map;
3+
import com.gearfirst.warehouse.api.dto.NotificationDto;
54
import lombok.RequiredArgsConstructor;
65
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
76
import org.springframework.context.annotation.Bean;
@@ -11,20 +10,22 @@
1110
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
1211
import org.springframework.kafka.support.serializer.JsonDeserializer;
1312

13+
import java.util.Map;
14+
1415
@Configuration
1516
@RequiredArgsConstructor
1617
public class KafkaConsumerConfig {
1718
private final KafkaProperties properties;
1819

1920
@Bean
20-
public ConcurrentKafkaListenerContainerFactory<String, TestDto> testKafkaListenerContainerFactory() {
21-
ConcurrentKafkaListenerContainerFactory<String, TestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
21+
public ConcurrentKafkaListenerContainerFactory<String, NotificationDto> testKafkaListenerContainerFactory() {
22+
ConcurrentKafkaListenerContainerFactory<String, NotificationDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
2223

2324
factory.setConsumerFactory(testConsumerFactory());
2425
return factory;
2526
}
2627

27-
private ConsumerFactory<String, TestDto> testConsumerFactory() {
28+
private ConsumerFactory<String, NotificationDto> testConsumerFactory() {
2829
// 8. application.yml에서 가져온 기본 consumer 설정을 복사합니다.
2930
// (bootstrap-servers 주소 등이 여기에 포함됩니다.)
3031
Map<String, Object> props = properties.buildConsumerProperties(null);
@@ -37,7 +38,7 @@ private ConsumerFactory<String, TestDto> testConsumerFactory() {
3738

3839
// Value(메시지 내용)는 OrderDto.class로만 변환하고,
3940
// 타입 헤더(__TypeId__)는 무시(false)합니다.
40-
new JsonDeserializer<>(TestDto.class, false)
41+
new JsonDeserializer<>(NotificationDto.class, false)
4142
);
4243
}
4344
}

0 commit comments

Comments
 (0)