Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.assu.server.domain.deviceToken.entity.DeviceToken;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.Optional;
Expand All @@ -12,5 +14,10 @@ public interface DeviceTokenRepository extends JpaRepository<DeviceToken, Long>
@Query("select dt.token from DeviceToken dt where dt.member.id =:memberId and dt.active=true")
List<String> findActiveTokensByMemberId(@Param("memberId") Long memberId);

@Transactional
@Modifying
@Query("update DeviceToken dt set dt.active = false where dt.token in :tokens")
void deactivateTokens(@Param("tokens") List<String> tokens);

Optional<DeviceToken> findByToken(String token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,16 @@ public interface NotificationOutboxRepository extends JpaRepository<Notification
""")
int markSentById(@Param("id") Long id);

@org.springframework.data.jpa.repository.Modifying(clearAutomatically = true, flushAutomatically = true)
@org.springframework.data.jpa.repository.Query("""
update NotificationOutbox o
set o.status = com.assu.server.domain.notification.entity.NotificationOutbox.Status.FAILED
where o.id = :id
and o.status <> com.assu.server.domain.notification.entity.NotificationOutbox.Status.FAILED
""")
int markFailedById(@org.springframework.data.repository.query.Param("id") Long id);

boolean existsByIdAndStatus(Long id, NotificationOutbox.Status status);

List<NotificationOutbox> findTop50ByStatusOrderByIdAsc(NotificationOutbox.Status status);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,99 @@
import com.assu.server.infra.firebase.AmqpConfig;
import com.assu.server.infra.firebase.FcmClient;
import com.assu.server.domain.notification.dto.NotificationMessageDTO;

import com.google.firebase.messaging.FirebaseMessagingException;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationListener {

private final FcmClient fcmClient;
private final OutboxStatusService outboxStatus; // ← 주입
private final OutboxStatusService outboxStatus;

@RabbitListener(queues = AmqpConfig.QUEUE, ackMode = "MANUAL")
public void onMessage(@Payload NotificationMessageDTO payload,
Channel ch,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

final Long outboxId = safeParseLong(payload.getIdempotencyKey());

try {
fcmClient.sendToMemberId(payload.getReceiverId(), payload.getTitle(), payload.getBody(), payload.getData());
Long outboxId = Long.valueOf(payload.getIdempotencyKey());
outboxStatus.markSent(outboxId); // ★ ACK 전에 완료 표시

ch.basicAck(tag, false); // ★ 맨 마지막에 단 한 번만 ACK
} catch (RuntimeException e) {
if (isTransient(e)) ch.basicNack(tag, false, true);
else ch.basicNack(tag, false, false);
// 0) Outbox 선확인: 이미 처리되었으면 중복 전송/SELECT 자체를 스킵
if (outboxId != null && outboxStatus.isAlreadySent(outboxId)) {
log.debug("[Notify] already-sent outboxId={}, ACK", outboxId);
ch.basicAck(tag, false);
return;
}

// 1) 전송
FcmClient.FcmResult result = fcmClient.sendToMemberId(
payload.getReceiverId(), payload.getTitle(), payload.getBody(), payload.getData());

// 2) 성공 처리
if (outboxId != null) outboxStatus.markSent(outboxId);
ch.basicAck(tag, false);

// 3) 관측용 로그
log.info("[Notify] sent outboxId={} memberId={} success={} fail={} invalidTokens={}",
outboxId, payload.getReceiverId(),
result.successCount(), result.failureCount(), result.invalidTokens());

} catch (FirebaseMessagingException fme) {
boolean permanent = isPermanent(fme);
log.error("[Notify] FCM failure outboxId={} memberId={} permanent={} http={} code={} root={}",
outboxId, payload.getReceiverId(), permanent,
FcmClient.httpStatusOf(fme), fme.getMessagingErrorCode(), rootSummary(fme), fme);

if (outboxId != null) outboxStatus.markFailed(outboxId);
ch.basicNack(tag, false, false); // requeue 금지(지연 재시도 큐 권장)

} catch (java.net.UnknownHostException | javax.net.ssl.SSLHandshakeException e) {
// 환경 문제(DNS/CA)는 영구 취급(루프 방지)
log.error("[Notify] ENV failure outboxId={} memberId={} root={}",
outboxId, payload.getReceiverId(), rootSummary(e), e);
if (outboxId != null) outboxStatus.markFailed(outboxId);
ch.basicNack(tag, false, false);

} catch (java.util.concurrent.TimeoutException | java.net.SocketTimeoutException e) {
// 타임아웃 → 일시 장애. 그래도 requeue(true)는 쓰지 않음
log.warn("[Notify] TIMEOUT outboxId={} memberId={} root={}",
outboxId, payload.getReceiverId(), rootSummary(e), e);
if (outboxId != null) outboxStatus.markFailed(outboxId);
ch.basicNack(tag, false, false);

} catch (Exception e) {
// 알 수 없는 오류 → DLQ
log.error("[Notify] UNKNOWN failure outboxId={} memberId={} root={}",
outboxId, payload.getReceiverId(), rootSummary(e), e);
if (outboxId != null) outboxStatus.markFailed(outboxId);
ch.basicNack(tag, false, false);
}
}

private boolean isTransient(Throwable t) {
while (t != null) {
if (t instanceof java.util.concurrent.TimeoutException
|| t instanceof java.net.SocketTimeoutException
|| t instanceof java.io.IOException) {
return true;
}
t = t.getCause();
}
private boolean isPermanent(FirebaseMessagingException fme) {
var code = fme.getMessagingErrorCode();
Integer http = FcmClient.httpStatusOf(fme);
if (code == com.google.firebase.messaging.MessagingErrorCode.UNREGISTERED
|| code == com.google.firebase.messaging.MessagingErrorCode.INVALID_ARGUMENT) return true;
if (http != null && (http == 401 || http == 403)) return true;
return false;
}

private String rootSummary(Throwable t) {
Throwable r = t; while (r.getCause() != null) r = r.getCause();
return r.getClass().getName() + ": " + String.valueOf(r.getMessage());
}

private Long safeParseLong(String s) {
try { return s == null ? null : Long.valueOf(s); } catch (Exception ignore) { return null; }
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.assu.server.domain.notification.service;

import com.assu.server.domain.notification.entity.NotificationOutbox;
import com.assu.server.domain.notification.repository.NotificationOutboxRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -24,4 +25,16 @@ public void markSent(Long id) {
int updated = repo.markSentById(id);
log.info("[OutboxStatus] SENT updated={} outboxId={}", updated, id);
}

@Transactional(readOnly = true)
public boolean isAlreadySent(Long id) {
return repo.existsByIdAndStatus(id, NotificationOutbox.Status.SENT);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markFailed(Long id) {
int updated = repo.markFailedById(id);
log.info("[OutboxStatus] FAILED updated={} outboxId={}", updated, id);
}

}
152 changes: 109 additions & 43 deletions src/main/java/com/assu/server/infra/firebase/FcmClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,141 @@

import com.assu.server.domain.deviceToken.repository.DeviceTokenRepository;
import com.google.api.core.ApiFuture;
import com.google.firebase.messaging.AndroidConfig;
import com.google.firebase.messaging.FirebaseMessaging;
import com.google.firebase.messaging.Message;
import com.google.firebase.messaging.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Slf4j
@Component
@RequiredArgsConstructor
public class FcmClient {

private final FirebaseMessaging messaging;
private final DeviceTokenRepository tokenRepo;

// 전송 타임아웃
private static final Duration SEND_TIMEOUT = Duration.ofSeconds(3);
// 운영에서 3s는 다소 공격적 — 5s 정도 권장
private static final Duration SEND_TIMEOUT = Duration.ofSeconds(5);

public void sendToMemberId(Long memberId, String title, String body, Map<String, String> data) {
if (memberId == null) {
throw new IllegalArgumentException("receiverId is null");
}
/**
* 멤버의 활성 토큰 전체에 멀티캐스트 전송.
* - 실패 토큰(UNREGISTERED/INVALID_ARGUMENT)은 즉시 비활성화
* - 결과 요약을 반환
*/
public FcmResult sendToMemberId(Long memberId, String title, String body, Map<String, String> data)
throws TimeoutException, InterruptedException, FirebaseMessagingException, ExecutionException {
if (memberId == null) throw new IllegalArgumentException("receiverId is null");

// 1) 토큰 조회
List<String> tokens = tokenRepo.findActiveTokensByMemberId(memberId);
if (tokens.isEmpty()) return;
if (tokens == null || tokens.isEmpty()) {
return FcmResult.empty();
}

// 2) 널 세이프 처리
// 2) 널 세이프
final String _title = title == null ? "" : title;
final String _body = body == null ? "" : body;

String type = (data != null && data.get("type") != null) ? data.get("type") : "";
String refId = (data != null && data.get("refId") != null) ? data.get("refId") : "";
String deeplink = (data != null && data.get("deeplink") != null) ? data.get("deeplink") : "";
String notificationId = (data != null && data.get("notificationId") != null) ? data.get("notificationId") : "";

// 3) 각 토큰에 FCM 전송 (data-only + HIGH)
for (String token : tokens) {
Message msg = Message.builder()
.setToken(token)
.setAndroidConfig(AndroidConfig.builder()
.setPriority(AndroidConfig.Priority.HIGH)
// 필요 시 TTL 등 추가 가능
// .setTtl(10_000L) // 10초
.build())
// --- data-only payload ---
.putData("title", _title)
.putData("body", _body)
.putData("type", type)
.putData("refId", refId)
.putData("deeplink", deeplink)
.putData("notificationId", notificationId)
.build();

try {
ApiFuture<String> future = messaging.sendAsync(msg);
String messageId = future.get(SEND_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
log.debug("[FCM] sent messageId={} memberId={}", messageId, memberId);
} catch (TimeoutException te) {
log.warn("[FCM] timeout ({} ms) memberId={}", SEND_TIMEOUT.toMillis(), memberId);
throw new RuntimeException("FCM timeout", te);
} catch (Exception e) {
throw new RuntimeException("FCM unexpected error", e);
String type = data != null ? data.getOrDefault("type", "") : "";
String refId = data != null ? data.getOrDefault("refId", "") : "";
String deeplink = data != null ? data.getOrDefault("deeplink", "") : "";
String notificationId = data != null ? data.getOrDefault("notificationId", "") : "";

// 3) 멀티캐스트 메시지 구성 (data-only + HIGH)
com.google.firebase.messaging.MulticastMessage msg =
com.google.firebase.messaging.MulticastMessage.builder()
.addAllTokens(tokens)
.setAndroidConfig(AndroidConfig.builder()
.setPriority(AndroidConfig.Priority.HIGH)
.build())
.putData("title", _title)
.putData("body", _body)
.putData("type", type)
.putData("refId", refId)
.putData("deeplink", deeplink)
.putData("notificationId", notificationId)
.build();

try {
ApiFuture<BatchResponse> future = messaging.sendEachForMulticastAsync(msg);
BatchResponse br = future.get(SEND_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);

int success = 0, fail = 0;
List<String> invalidTokens = new ArrayList<>();

List<SendResponse> responses = br.getResponses();
for (int i = 0; i < responses.size(); i++) {
SendResponse r = responses.get(i);
if (r.isSuccessful()) {
success++;
} else {
fail++;
FirebaseMessagingException fme = r.getException(); // per-token 예외
if (fme != null && (
fme.getMessagingErrorCode() == MessagingErrorCode.UNREGISTERED ||
fme.getMessagingErrorCode() == MessagingErrorCode.INVALID_ARGUMENT)) {
invalidTokens.add(tokens.get(i));
}
log.warn("[FCM] per-token fail memberId={} idx={} code={} root={}",
memberId, i,
(fme != null ? fme.getMessagingErrorCode() : null),
rootSummary(fme));
}
}

if (!invalidTokens.isEmpty()) {
try {
tokenRepo.deactivateTokens(invalidTokens); // UPDATE ... SET active=0 WHERE token IN (...)
} catch (Exception e) {
log.error("[FCM] deactivateTokens failed size={} memberId={} root={}",
invalidTokens.size(), memberId, rootSummary(e), e);
}
}

return new FcmResult(success, fail, invalidTokens);

} catch (TimeoutException te) {
log.warn("[FCM] timeout ({} ms) memberId={}", SEND_TIMEOUT.toMillis(), memberId);
throw te;

} catch (ExecutionException ee) {
// ★ 핵심: Future가 싸서 던진 예외를 원형으로 복원
Throwable c = ee.getCause();
if (c instanceof FirebaseMessagingException fme) {
log.error("[FCM] FirebaseMessagingException memberId={} http={} code={} root={}",
memberId, httpStatusOf(fme), fme.getMessagingErrorCode(), rootSummary(fme), fme);
throw fme; // 리스너에서 코드/HTTP 기반 분류 가능
}
throw ee; // 그 외는 그대로 위로

} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw ie;
}
}

public static Integer httpStatusOf(com.google.firebase.messaging.FirebaseMessagingException fme) {
try {
var resp = fme.getHttpResponse();
return resp != null ? resp.getStatusCode() : null;
} catch (Throwable ignore) { return null; }
}

private String rootSummary(Throwable t) {
if (t == null) return "null";
Throwable r = t; while (r.getCause() != null) r = r.getCause();
return r.getClass().getName() + ": " + String.valueOf(r.getMessage());
}

public record FcmResult(int successCount, int failureCount, List<String> invalidTokens) {
static FcmResult empty() { return new FcmResult(0, 0, java.util.List.of()); }
}
}