From bd16694423af2d4900e5e8469a7e73fe9b650c1e Mon Sep 17 00:00:00 2001 From: MiN <81948815+leesumin0526@users.noreply.github.com> Date: Wed, 1 Oct 2025 16:57:23 +1000 Subject: [PATCH] =?UTF-8?q?[REFACTOR/#181]=20-=20=EB=94=94=EB=B0=94?= =?UTF-8?q?=EC=9D=B4=EC=8A=A4=20=ED=86=A0=ED=81=B0=20=EC=97=90=EB=9F=AC=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../repository/DeviceTokenRepository.java | 7 + .../NotificationOutboxRepository.java | 11 ++ .../service/NotificationListener.java | 88 +++++++--- .../service/OutboxStatusService.java | 13 ++ .../assu/server/infra/firebase/FcmClient.java | 152 +++++++++++++----- 5 files changed, 207 insertions(+), 64 deletions(-) diff --git a/src/main/java/com/assu/server/domain/deviceToken/repository/DeviceTokenRepository.java b/src/main/java/com/assu/server/domain/deviceToken/repository/DeviceTokenRepository.java index 6720d390..ee0c8349 100644 --- a/src/main/java/com/assu/server/domain/deviceToken/repository/DeviceTokenRepository.java +++ b/src/main/java/com/assu/server/domain/deviceToken/repository/DeviceTokenRepository.java @@ -2,8 +2,10 @@ import com.assu.server.domain.deviceToken.entity.DeviceToken; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Optional; @@ -12,5 +14,10 @@ public interface DeviceTokenRepository extends JpaRepository @Query("select dt.token from DeviceToken dt where dt.member.id =:memberId and dt.active=true") List findActiveTokensByMemberId(@Param("memberId") Long memberId); + @Transactional + @Modifying + @Query("update DeviceToken dt set dt.active = false where dt.token in :tokens") + void deactivateTokens(@Param("tokens") List tokens); + Optional findByToken(String token); } diff --git a/src/main/java/com/assu/server/domain/notification/repository/NotificationOutboxRepository.java b/src/main/java/com/assu/server/domain/notification/repository/NotificationOutboxRepository.java index 98948596..abbcfa1d 100644 --- a/src/main/java/com/assu/server/domain/notification/repository/NotificationOutboxRepository.java +++ b/src/main/java/com/assu/server/domain/notification/repository/NotificationOutboxRepository.java @@ -27,5 +27,16 @@ public interface NotificationOutboxRepository extends JpaRepository com.assu.server.domain.notification.entity.NotificationOutbox.Status.FAILED + """) + int markFailedById(@org.springframework.data.repository.query.Param("id") Long id); + + boolean existsByIdAndStatus(Long id, NotificationOutbox.Status status); + List findTop50ByStatusOrderByIdAsc(NotificationOutbox.Status status); } diff --git a/src/main/java/com/assu/server/domain/notification/service/NotificationListener.java b/src/main/java/com/assu/server/domain/notification/service/NotificationListener.java index 841f8964..30d40a1b 100644 --- a/src/main/java/com/assu/server/domain/notification/service/NotificationListener.java +++ b/src/main/java/com/assu/server/domain/notification/service/NotificationListener.java @@ -3,53 +3,99 @@ import com.assu.server.infra.firebase.AmqpConfig; import com.assu.server.infra.firebase.FcmClient; import com.assu.server.domain.notification.dto.NotificationMessageDTO; - +import com.google.firebase.messaging.FirebaseMessagingException; import com.rabbitmq.client.Channel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; - import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; - @Slf4j @Component @RequiredArgsConstructor public class NotificationListener { private final FcmClient fcmClient; - private final OutboxStatusService outboxStatus; // ← 주입 + private final OutboxStatusService outboxStatus; @RabbitListener(queues = AmqpConfig.QUEUE, ackMode = "MANUAL") public void onMessage(@Payload NotificationMessageDTO payload, Channel ch, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { + + final Long outboxId = safeParseLong(payload.getIdempotencyKey()); + try { - fcmClient.sendToMemberId(payload.getReceiverId(), payload.getTitle(), payload.getBody(), payload.getData()); - Long outboxId = Long.valueOf(payload.getIdempotencyKey()); - outboxStatus.markSent(outboxId); // ★ ACK 전에 완료 표시 - - ch.basicAck(tag, false); // ★ 맨 마지막에 단 한 번만 ACK - } catch (RuntimeException e) { - if (isTransient(e)) ch.basicNack(tag, false, true); - else ch.basicNack(tag, false, false); + // 0) Outbox 선확인: 이미 처리되었으면 중복 전송/SELECT 자체를 스킵 + if (outboxId != null && outboxStatus.isAlreadySent(outboxId)) { + log.debug("[Notify] already-sent outboxId={}, ACK", outboxId); + ch.basicAck(tag, false); + return; + } + + // 1) 전송 + FcmClient.FcmResult result = fcmClient.sendToMemberId( + payload.getReceiverId(), payload.getTitle(), payload.getBody(), payload.getData()); + + // 2) 성공 처리 + if (outboxId != null) outboxStatus.markSent(outboxId); + ch.basicAck(tag, false); + + // 3) 관측용 로그 + log.info("[Notify] sent outboxId={} memberId={} success={} fail={} invalidTokens={}", + outboxId, payload.getReceiverId(), + result.successCount(), result.failureCount(), result.invalidTokens()); + + } catch (FirebaseMessagingException fme) { + boolean permanent = isPermanent(fme); + log.error("[Notify] FCM failure outboxId={} memberId={} permanent={} http={} code={} root={}", + outboxId, payload.getReceiverId(), permanent, + FcmClient.httpStatusOf(fme), fme.getMessagingErrorCode(), rootSummary(fme), fme); + + if (outboxId != null) outboxStatus.markFailed(outboxId); + ch.basicNack(tag, false, false); // requeue 금지(지연 재시도 큐 권장) + + } catch (java.net.UnknownHostException | javax.net.ssl.SSLHandshakeException e) { + // 환경 문제(DNS/CA)는 영구 취급(루프 방지) + log.error("[Notify] ENV failure outboxId={} memberId={} root={}", + outboxId, payload.getReceiverId(), rootSummary(e), e); + if (outboxId != null) outboxStatus.markFailed(outboxId); + ch.basicNack(tag, false, false); + + } catch (java.util.concurrent.TimeoutException | java.net.SocketTimeoutException e) { + // 타임아웃 → 일시 장애. 그래도 requeue(true)는 쓰지 않음 + log.warn("[Notify] TIMEOUT outboxId={} memberId={} root={}", + outboxId, payload.getReceiverId(), rootSummary(e), e); + if (outboxId != null) outboxStatus.markFailed(outboxId); + ch.basicNack(tag, false, false); + } catch (Exception e) { + // 알 수 없는 오류 → DLQ + log.error("[Notify] UNKNOWN failure outboxId={} memberId={} root={}", + outboxId, payload.getReceiverId(), rootSummary(e), e); + if (outboxId != null) outboxStatus.markFailed(outboxId); ch.basicNack(tag, false, false); } } - private boolean isTransient(Throwable t) { - while (t != null) { - if (t instanceof java.util.concurrent.TimeoutException - || t instanceof java.net.SocketTimeoutException - || t instanceof java.io.IOException) { - return true; - } - t = t.getCause(); - } + private boolean isPermanent(FirebaseMessagingException fme) { + var code = fme.getMessagingErrorCode(); + Integer http = FcmClient.httpStatusOf(fme); + if (code == com.google.firebase.messaging.MessagingErrorCode.UNREGISTERED + || code == com.google.firebase.messaging.MessagingErrorCode.INVALID_ARGUMENT) return true; + if (http != null && (http == 401 || http == 403)) return true; return false; } + + private String rootSummary(Throwable t) { + Throwable r = t; while (r.getCause() != null) r = r.getCause(); + return r.getClass().getName() + ": " + String.valueOf(r.getMessage()); + } + + private Long safeParseLong(String s) { + try { return s == null ? null : Long.valueOf(s); } catch (Exception ignore) { return null; } + } } \ No newline at end of file diff --git a/src/main/java/com/assu/server/domain/notification/service/OutboxStatusService.java b/src/main/java/com/assu/server/domain/notification/service/OutboxStatusService.java index 13412207..80e4778a 100644 --- a/src/main/java/com/assu/server/domain/notification/service/OutboxStatusService.java +++ b/src/main/java/com/assu/server/domain/notification/service/OutboxStatusService.java @@ -1,5 +1,6 @@ package com.assu.server.domain.notification.service; +import com.assu.server.domain.notification.entity.NotificationOutbox; import com.assu.server.domain.notification.repository.NotificationOutboxRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -24,4 +25,16 @@ public void markSent(Long id) { int updated = repo.markSentById(id); log.info("[OutboxStatus] SENT updated={} outboxId={}", updated, id); } + + @Transactional(readOnly = true) + public boolean isAlreadySent(Long id) { + return repo.existsByIdAndStatus(id, NotificationOutbox.Status.SENT); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void markFailed(Long id) { + int updated = repo.markFailedById(id); + log.info("[OutboxStatus] FAILED updated={} outboxId={}", updated, id); + } + } diff --git a/src/main/java/com/assu/server/infra/firebase/FcmClient.java b/src/main/java/com/assu/server/infra/firebase/FcmClient.java index 81fb0400..78c3942b 100644 --- a/src/main/java/com/assu/server/infra/firebase/FcmClient.java +++ b/src/main/java/com/assu/server/infra/firebase/FcmClient.java @@ -2,16 +2,16 @@ import com.assu.server.domain.deviceToken.repository.DeviceTokenRepository; import com.google.api.core.ApiFuture; -import com.google.firebase.messaging.AndroidConfig; -import com.google.firebase.messaging.FirebaseMessaging; -import com.google.firebase.messaging.Message; +import com.google.firebase.messaging.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -19,58 +19,124 @@ @Component @RequiredArgsConstructor public class FcmClient { + private final FirebaseMessaging messaging; private final DeviceTokenRepository tokenRepo; - // 전송 타임아웃 - private static final Duration SEND_TIMEOUT = Duration.ofSeconds(3); + // 운영에서 3s는 다소 공격적 — 5s 정도 권장 + private static final Duration SEND_TIMEOUT = Duration.ofSeconds(5); - public void sendToMemberId(Long memberId, String title, String body, Map data) { - if (memberId == null) { - throw new IllegalArgumentException("receiverId is null"); - } + /** + * 멤버의 활성 토큰 전체에 멀티캐스트 전송. + * - 실패 토큰(UNREGISTERED/INVALID_ARGUMENT)은 즉시 비활성화 + * - 결과 요약을 반환 + */ + public FcmResult sendToMemberId(Long memberId, String title, String body, Map data) + throws TimeoutException, InterruptedException, FirebaseMessagingException, ExecutionException { + if (memberId == null) throw new IllegalArgumentException("receiverId is null"); // 1) 토큰 조회 List tokens = tokenRepo.findActiveTokensByMemberId(memberId); - if (tokens.isEmpty()) return; + if (tokens == null || tokens.isEmpty()) { + return FcmResult.empty(); + } - // 2) 널 세이프 처리 + // 2) 널 세이프 final String _title = title == null ? "" : title; final String _body = body == null ? "" : body; - String type = (data != null && data.get("type") != null) ? data.get("type") : ""; - String refId = (data != null && data.get("refId") != null) ? data.get("refId") : ""; - String deeplink = (data != null && data.get("deeplink") != null) ? data.get("deeplink") : ""; - String notificationId = (data != null && data.get("notificationId") != null) ? data.get("notificationId") : ""; - - // 3) 각 토큰에 FCM 전송 (data-only + HIGH) - for (String token : tokens) { - Message msg = Message.builder() - .setToken(token) - .setAndroidConfig(AndroidConfig.builder() - .setPriority(AndroidConfig.Priority.HIGH) - // 필요 시 TTL 등 추가 가능 - // .setTtl(10_000L) // 10초 - .build()) - // --- data-only payload --- - .putData("title", _title) - .putData("body", _body) - .putData("type", type) - .putData("refId", refId) - .putData("deeplink", deeplink) - .putData("notificationId", notificationId) - .build(); - - try { - ApiFuture future = messaging.sendAsync(msg); - String messageId = future.get(SEND_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); - log.debug("[FCM] sent messageId={} memberId={}", messageId, memberId); - } catch (TimeoutException te) { - log.warn("[FCM] timeout ({} ms) memberId={}", SEND_TIMEOUT.toMillis(), memberId); - throw new RuntimeException("FCM timeout", te); - } catch (Exception e) { - throw new RuntimeException("FCM unexpected error", e); + String type = data != null ? data.getOrDefault("type", "") : ""; + String refId = data != null ? data.getOrDefault("refId", "") : ""; + String deeplink = data != null ? data.getOrDefault("deeplink", "") : ""; + String notificationId = data != null ? data.getOrDefault("notificationId", "") : ""; + + // 3) 멀티캐스트 메시지 구성 (data-only + HIGH) + com.google.firebase.messaging.MulticastMessage msg = + com.google.firebase.messaging.MulticastMessage.builder() + .addAllTokens(tokens) + .setAndroidConfig(AndroidConfig.builder() + .setPriority(AndroidConfig.Priority.HIGH) + .build()) + .putData("title", _title) + .putData("body", _body) + .putData("type", type) + .putData("refId", refId) + .putData("deeplink", deeplink) + .putData("notificationId", notificationId) + .build(); + + try { + ApiFuture future = messaging.sendEachForMulticastAsync(msg); + BatchResponse br = future.get(SEND_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + + int success = 0, fail = 0; + List invalidTokens = new ArrayList<>(); + + List responses = br.getResponses(); + for (int i = 0; i < responses.size(); i++) { + SendResponse r = responses.get(i); + if (r.isSuccessful()) { + success++; + } else { + fail++; + FirebaseMessagingException fme = r.getException(); // per-token 예외 + if (fme != null && ( + fme.getMessagingErrorCode() == MessagingErrorCode.UNREGISTERED || + fme.getMessagingErrorCode() == MessagingErrorCode.INVALID_ARGUMENT)) { + invalidTokens.add(tokens.get(i)); + } + log.warn("[FCM] per-token fail memberId={} idx={} code={} root={}", + memberId, i, + (fme != null ? fme.getMessagingErrorCode() : null), + rootSummary(fme)); + } + } + + if (!invalidTokens.isEmpty()) { + try { + tokenRepo.deactivateTokens(invalidTokens); // UPDATE ... SET active=0 WHERE token IN (...) + } catch (Exception e) { + log.error("[FCM] deactivateTokens failed size={} memberId={} root={}", + invalidTokens.size(), memberId, rootSummary(e), e); + } + } + + return new FcmResult(success, fail, invalidTokens); + + } catch (TimeoutException te) { + log.warn("[FCM] timeout ({} ms) memberId={}", SEND_TIMEOUT.toMillis(), memberId); + throw te; + + } catch (ExecutionException ee) { + // ★ 핵심: Future가 싸서 던진 예외를 원형으로 복원 + Throwable c = ee.getCause(); + if (c instanceof FirebaseMessagingException fme) { + log.error("[FCM] FirebaseMessagingException memberId={} http={} code={} root={}", + memberId, httpStatusOf(fme), fme.getMessagingErrorCode(), rootSummary(fme), fme); + throw fme; // 리스너에서 코드/HTTP 기반 분류 가능 } + throw ee; // 그 외는 그대로 위로 + + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw ie; } } + + public static Integer httpStatusOf(com.google.firebase.messaging.FirebaseMessagingException fme) { + try { + var resp = fme.getHttpResponse(); + return resp != null ? resp.getStatusCode() : null; + } catch (Throwable ignore) { return null; } + } + + private String rootSummary(Throwable t) { + if (t == null) return "null"; + Throwable r = t; while (r.getCause() != null) r = r.getCause(); + return r.getClass().getName() + ": " + String.valueOf(r.getMessage()); + } + + public record FcmResult(int successCount, int failureCount, List invalidTokens) { + static FcmResult empty() { return new FcmResult(0, 0, java.util.List.of()); } + } } \ No newline at end of file