Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,56 @@ public void publishBatch(){
Object evt;
String topic;

// MPS 이벤트 발행 상세 로깅 추가
if (o.getEventType().startsWith("Mps")) {
log.info("MPS 이벤트 발행 시작 - OUTBOX ID: {}, EventType: {}, AggregateId: {}",
o.getId(), o.getEventType(), o.getAggregateId());
}

// eventType에 따라 적절한 이벤트 타입과 토픽 결정
if (o.getEventType().startsWith("PartOrder")) {
// PartOrder 관련 이벤트 (일반 PartOrder + MPS)
if (o.getEventType().startsWith("PartOrder") || o.getEventType().startsWith("Mps")) {
PartOrderEvent partOrderEvent = objectMapper.treeToValue(o.getPayload(), PartOrderEvent.class);
String eventJson = objectMapper.writeValueAsString(partOrderEvent);
topic = TOPIC_PART_ORDER;
evt = eventJson;

// MPS 이벤트인 경우 추가 로깅
if (o.getEventType().startsWith("Mps")) {
log.info("MPS 이벤트 JSON 변환 완료 - EventType: {}, Topic: {}, EventJson 길이: {}",
o.getEventType(), topic, eventJson.length());
}
} else {
FactoryEvent factoryEvent = objectMapper.treeToValue(o.getPayload(), FactoryEvent.class);
String eventJson = objectMapper.writeValueAsString(factoryEvent);
topic = TOPIC_FACTORY;
evt = eventJson;
}

// MPS 이벤트 Kafka 전송 로깅
if (o.getEventType().startsWith("Mps")) {
log.info("MPS 이벤트 Kafka 전송 시작 - EventType: {}, Topic: {}, Key: {}",
o.getEventType(), topic, String.valueOf(o.getAggregateId()));
}

kafkaTemplate.send(topic, String.valueOf(o.getAggregateId()), evt)
.get(5, TimeUnit.SECONDS);

o.markPublished();

// MPS 이벤트 발행 완료 로깅
if (o.getEventType().startsWith("Mps")) {
log.info("MPS 이벤트 Kafka 전송 완료 - OUTBOX ID: {}, EventType: {}, 상태: published",
o.getId(), o.getEventType());
}

} catch (Exception e){
// MPS 이벤트 발행 실패 로깅
if (o.getEventType().startsWith("Mps")) {
log.error("MPS 이벤트 Kafka 전송 실패 - OUTBOX ID: {}, EventType: {}, 오류: {}",
o.getId(), o.getEventType(), e.getMessage(), e);
}

int nextRetry = o.getRetryCount() + 1;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.sampoom.factory.api.factory.outbox.FactoryOutbox;
import com.sampoom.factory.api.factory.outbox.FactoryOutboxRepository;
import com.sampoom.factory.api.part.entity.PartOrder;
import com.sampoom.factory.api.part.entity.PartOrderType;
import com.sampoom.factory.api.part.event.PartOrderEvent;
import com.sampoom.factory.api.part.repository.PartProjectionRepository;
import com.sampoom.factory.api.factory.repository.FactoryProjectionRepository;
Expand Down Expand Up @@ -32,26 +33,52 @@ public class PartOrderEventService {

@Transactional
public void recordPartOrderCreated(PartOrder partOrder) {
log.info("부품 주문 생성 이벤트 발행 - 주문 ID: {}, 주문 코드: {}", partOrder.getId(), partOrder.getOrderCode());
enqueueEvent("PartOrderCreated", partOrder, nvl(partOrder.getVersion(), 0L), false);
// MPS 타입인 경우 별도 이벤트 타입 사용
if (partOrder.getOrderType() == PartOrderType.MPS) {
log.info("MPS 주문 생성 이벤트 발행 - 주문 ID: {}, 주문 코드: {}", partOrder.getId(), partOrder.getOrderCode());
enqueueEvent("MpsCreated", partOrder, nvl(partOrder.getVersion(), 0L), false);
} else {
log.info("부품 주문 생성 이벤트 발행 - 주문 ID: {}, 주문 코드: {}", partOrder.getId(), partOrder.getOrderCode());
enqueueEvent("PartOrderCreated", partOrder, nvl(partOrder.getVersion(), 0L), false);
}
}

@Transactional
public void recordPartOrderStatusChanged(PartOrder partOrder) {
log.info("부품 주문 상태 변경 이벤트 발행 - 주문 ID: {}, 상태: {}", partOrder.getId(), partOrder.getStatus());
enqueueEvent("PartOrderStatusChanged", partOrder, nvl(partOrder.getVersion(), 0L), false);
// MPS 타입인 경우 별도 이벤트 타입 사용
if (partOrder.getOrderType() == PartOrderType.MPS) {
log.info("MPS 주문 상태 변경 이벤트 발행 - 주문 ID: {}, 상태: {}", partOrder.getId(), partOrder.getStatus());
enqueueEvent("MpsStatusChanged", partOrder, nvl(partOrder.getVersion(), 0L), false);
} else {
log.info("부품 주문 상태 변경 이벤트 발행 - 주문 ID: {}, 상태: {}", partOrder.getId(), partOrder.getStatus());
enqueueEvent("PartOrderStatusChanged", partOrder, nvl(partOrder.getVersion(), 0L), false);
}
}

@Transactional
public void recordPartOrderCompleted(PartOrder partOrder) {
log.info("부품 주문 완료 이벤트 발행 - 주문 ID: {}, 주문 코드: {}", partOrder.getId(), partOrder.getOrderCode());
enqueueEvent("PartOrderCompleted", partOrder, nvl(partOrder.getVersion(), 0L), false);
// MPS 타입인 경우 별도 이벤트 타입 사용
if (partOrder.getOrderType() == PartOrderType.MPS) {
log.info("MPS 주문 완료 이벤트 발행 - 주문 ID: {}, 주문 코드: {}, orderType: {}",
partOrder.getId(), partOrder.getOrderCode(), partOrder.getOrderType());
enqueueEvent("MpsCompleted", partOrder, nvl(partOrder.getVersion(), 0L), false);
log.info("MPS 주문 완료 이벤트 OUTBOX 저장 완료 - 주문 ID: {}, eventType: MpsCompleted", partOrder.getId());
} else {
log.info("부품 주문 완료 이벤트 발행 - 주문 ID: {}, 주문 코드: {}", partOrder.getId(), partOrder.getOrderCode());
enqueueEvent("PartOrderCompleted", partOrder, nvl(partOrder.getVersion(), 0L), false);
}
}

@Transactional
public void recordPartOrderDeleted(PartOrder partOrder) {
log.info("부품 주문 삭제 이벤트 발행 - 주문 ID: {}, 주문 코드: {}", partOrder.getId(), partOrder.getOrderCode());
enqueueEvent("PartOrderDeleted", partOrder, nvl(partOrder.getVersion(), 0L), true);
// MPS 타입인 경우 별도 이벤트 타입 사용
if (partOrder.getOrderType() == PartOrderType.MPS) {
log.info("MPS 주문 삭제 이벤트 발행 - 주문 ID: {}, 주문 코드: {}", partOrder.getId(), partOrder.getOrderCode());
enqueueEvent("MpsDeleted", partOrder, nvl(partOrder.getVersion(), 0L), true);
} else {
log.info("부품 주문 삭제 이벤트 발행 - 주문 ID: {}, 주문 코드: {}", partOrder.getId(), partOrder.getOrderCode());
enqueueEvent("PartOrderDeleted", partOrder, nvl(partOrder.getVersion(), 0L), true);
}
}

// ===== 공통 헬퍼 =====
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,15 @@ public PageResponseDto<PartOrderResponseDto> getPartOrders(Long factoryId, PartO
PartOrderStatus originalStatus = partOrder.getStatus();
partOrder.calculateProgressByDate();

// 상태가 변경된 경우 DB에 저장
// 상태가 변경된 경우 DB에 저장하고 이벤트 발행
if (!originalStatus.equals(partOrder.getStatus())) {
partOrderRepository.save(partOrder);

// IN_PROGRESS에서 COMPLETED로 변경된 경우 완료 이벤트 발행
if (originalStatus == PartOrderStatus.IN_PROGRESS &&
partOrder.getStatus() == PartOrderStatus.COMPLETED) {
partOrderEventService.recordPartOrderCompleted(partOrder);
}
}

return toResponseDto(partOrder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void handleOrderToFactoryEvent(String message) {
handleEvent(message, "OrderToFactoryEvent", OrderToFactoryEventDto.class, orderToFactoryEventService::processOrderToFactoryEvent);
}

@KafkaListener(topics = "part-forecast-events", groupId = "sampoom-factory-test5")
@KafkaListener(topics = "part-forecast-events", groupId = "${spring.kafka.consumer.group-id}")
public void handlePartForecastEvent(String message) {
handleEvent(message, "PartForecastEvent", PartForecastEvent.class, mpsEventService::processPartForecastEvent);
}
Expand Down