Skip to content

Commit 0160ab2

Browse files
authored
Merge pull request #17 from Pinit-Scheduler/feat/알림-배치처리-적용
Feat/알림 배치처리 적용
2 parents 4f30c80 + 8f93b53 commit 0160ab2

15 files changed

Lines changed: 381 additions & 39 deletions
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package me.pinitnotification.application.notification;
2+
3+
import me.pinitnotification.domain.notification.UpcomingScheduleNotification;
4+
5+
import java.util.List;
6+
7+
public record NotificationDispatchItem(
8+
UpcomingScheduleNotification notification,
9+
List<String> tokens
10+
) {
11+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package me.pinitnotification.application.notification;
2+
3+
import java.time.Instant;
4+
import java.util.List;
5+
6+
public interface NotificationDispatchQueryRepository {
7+
List<NotificationDispatchItem> findAllDueNotificationsWithTokens(Instant now);
8+
}
Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,85 @@
11
package me.pinitnotification.application.notification;
22

3+
import me.pinitnotification.application.push.PushSendResult;
4+
import me.pinitnotification.application.push.PushService;
35
import me.pinitnotification.domain.notification.UpcomingScheduleNotification;
46
import me.pinitnotification.domain.notification.UpcomingScheduleNotificationRepository;
5-
import me.pinitnotification.domain.push.PushSubscription;
6-
import me.pinitnotification.domain.push.PushSubscriptionRepository;
7-
import me.pinitnotification.application.push.PushService;
87
import org.slf4j.Logger;
98
import org.slf4j.LoggerFactory;
109
import org.springframework.scheduling.annotation.Scheduled;
1110
import org.springframework.stereotype.Service;
1211
import org.springframework.transaction.annotation.Transactional;
1312

1413
import java.time.Clock;
15-
import java.time.OffsetDateTime;
16-
import java.time.format.DateTimeParseException;
14+
import java.time.Instant;
15+
import java.util.HashSet;
1716
import java.util.List;
17+
import java.util.Set;
1818

1919
@Service
2020
public class NotificationDispatchScheduler {
2121
private static final Logger log = LoggerFactory.getLogger(NotificationDispatchScheduler.class);
2222

2323
private final UpcomingScheduleNotificationRepository notificationRepository;
24-
private final PushSubscriptionRepository pushSubscriptionRepository;
24+
private final NotificationDispatchQueryRepository dispatchQueryRepository;
25+
private final PushTokenCleanupService pushTokenCleanupService;
2526
private final PushService pushService;
2627
private final Clock clock;
2728

2829
public NotificationDispatchScheduler(UpcomingScheduleNotificationRepository notificationRepository,
29-
PushSubscriptionRepository pushSubscriptionRepository,
30+
NotificationDispatchQueryRepository dispatchQueryRepository,
31+
PushTokenCleanupService pushTokenCleanupService,
3032
PushService pushService,
3133
Clock clock) {
3234
this.notificationRepository = notificationRepository;
33-
this.pushSubscriptionRepository = pushSubscriptionRepository;
35+
this.dispatchQueryRepository = dispatchQueryRepository;
36+
this.pushTokenCleanupService = pushTokenCleanupService;
3437
this.pushService = pushService;
3538
this.clock = clock;
3639
}
3740

3841
@Scheduled(cron = "0 */10 * * * *")
3942
@Transactional
4043
public void dispatchDueNotifications() {
41-
OffsetDateTime now = OffsetDateTime.now(clock);
42-
List<UpcomingScheduleNotification> dueNotifications = notificationRepository.findAll().stream()
43-
.filter(notification -> notification.isDue(now))
44-
.toList();
44+
Instant now = Instant.now(clock);
45+
List<NotificationDispatchItem> dispatchItems = dispatchQueryRepository.findAllDueNotificationsWithTokens(now);
4546

46-
if (dueNotifications.isEmpty()) {
47+
if (dispatchItems.isEmpty()) {
4748
return;
4849
}
4950

50-
dueNotifications.forEach(this::sendNotificationToOwner);
51-
notificationRepository.deleteAllInBatch(dueNotifications);
51+
Set<String> tokensToDelete = new HashSet<>();
52+
dispatchItems.forEach(item -> sendNotificationToOwner(item, tokensToDelete));
53+
notificationRepository.deleteAllInBatch(dispatchItems.stream().map(NotificationDispatchItem::notification).toList());
54+
55+
if (!tokensToDelete.isEmpty()) {
56+
deleteTokensSafely(tokensToDelete);
57+
}
5258
}
5359

5460

55-
private void sendNotificationToOwner(UpcomingScheduleNotification notification) {
56-
List<PushSubscription> subscriptions = pushSubscriptionRepository.findAllByMemberId(notification.getOwnerId());
57-
if (subscriptions.isEmpty()) {
61+
private void sendNotificationToOwner(NotificationDispatchItem dispatchItem, Set<String> tokensToDelete) {
62+
UpcomingScheduleNotification notification = dispatchItem.notification();
63+
List<String> tokens = dispatchItem.tokens();
64+
65+
if (tokens.isEmpty()) {
5866
log.info("No push tokens for owner; skip sending. ownerId={}, scheduleId={}", notification.getOwnerId(), notification.getScheduleId());
5967
return;
6068
}
6169

62-
subscriptions.forEach(subscription -> pushService.sendPushMessage(subscription.getToken(), notification));
70+
tokens.forEach(token -> {
71+
PushSendResult result = pushService.sendPushMessage(token, notification);
72+
if (result.shouldDeleteToken()) {
73+
tokensToDelete.add(token);
74+
}
75+
});
76+
}
77+
78+
private void deleteTokensSafely(Set<String> tokensToDelete) {
79+
try {
80+
pushTokenCleanupService.deleteTokensInNewTransaction(tokensToDelete);
81+
} catch (Exception ex) {
82+
log.warn("Failed to delete invalid push tokens; notifications already removed. tokens={}", tokensToDelete.size(), ex);
83+
}
6384
}
6485
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package me.pinitnotification.application.notification;
2+
3+
import me.pinitnotification.domain.push.PushSubscriptionRepository;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.stereotype.Service;
7+
import org.springframework.transaction.annotation.Propagation;
8+
import org.springframework.transaction.annotation.Transactional;
9+
10+
import java.util.Set;
11+
12+
@Service
13+
public class PushTokenCleanupService {
14+
private static final Logger log = LoggerFactory.getLogger(PushTokenCleanupService.class);
15+
private final PushSubscriptionRepository pushSubscriptionRepository;
16+
17+
public PushTokenCleanupService(PushSubscriptionRepository pushSubscriptionRepository) {
18+
this.pushSubscriptionRepository = pushSubscriptionRepository;
19+
}
20+
21+
@Transactional(propagation = Propagation.REQUIRES_NEW)
22+
public void deleteTokensInNewTransaction(Set<String> tokens) {
23+
if (tokens == null || tokens.isEmpty()) {
24+
return;
25+
}
26+
pushSubscriptionRepository.deleteByTokens(tokens);
27+
log.info("Deleted {} invalid push tokens", tokens.size());
28+
}
29+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package me.pinitnotification.application.push;
2+
3+
public record PushSendResult(
4+
boolean success,
5+
boolean invalidToken
6+
) {
7+
public static PushSendResult successResult() {
8+
return new PushSendResult(true, false);
9+
}
10+
11+
public static PushSendResult invalidTokenResult() {
12+
return new PushSendResult(false, true);
13+
}
14+
15+
public static PushSendResult failedResult() {
16+
return new PushSendResult(false, false);
17+
}
18+
19+
public boolean shouldDeleteToken() {
20+
return invalidToken;
21+
}
22+
}

src/main/java/me/pinitnotification/application/push/PushService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ public interface PushService {
88
void subscribe(Long memberId, String deviceId, String token);
99

1010
void unsubscribe(Long memberId, String deviceId, String token);
11-
void sendPushMessage(String token, Notification notification);
11+
12+
PushSendResult sendPushMessage(String token, Notification notification);
1213

1314
boolean isSubscribed(Long memberId, String deviceId);
1415
}

src/main/java/me/pinitnotification/domain/push/PushSubscriptionRepository.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package me.pinitnotification.domain.push;
22

3+
import java.util.Collection;
34
import java.util.List;
45
import java.util.Optional;
56

@@ -12,5 +13,7 @@ public interface PushSubscriptionRepository {
1213

1314
void deleteByToken(String token);
1415

16+
void deleteByTokens(Collection<String> tokens);
17+
1518
void deleteByMemberIdAndDeviceId(Long memberId, String deviceId);
1619
}

src/main/java/me/pinitnotification/infrastructure/fcm/FcmService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.google.firebase.messaging.Message;
66
import com.google.firebase.messaging.MessagingErrorCode;
77
import lombok.extern.slf4j.Slf4j;
8+
import me.pinitnotification.application.push.PushSendResult;
89
import me.pinitnotification.application.push.PushService;
910
import me.pinitnotification.domain.notification.Notification;
1011
import me.pinitnotification.domain.push.PushSubscription;
@@ -34,20 +35,21 @@ public FcmService(FirebaseMessaging firebaseMessaging,
3435
}
3536

3637
@Override
37-
public void sendPushMessage(String token, Notification notification) {
38+
public PushSendResult sendPushMessage(String token, Notification notification) {
3839
log.info("publish token: {}", token);
3940
Message message = Message.builder()
4041
.setToken(token)
4142
.putAllData(notification.getData())
4243
.build();
4344
try {
4445
firebaseMessaging.send(message);
46+
return PushSendResult.successResult();
4547
} catch (FirebaseMessagingException e) {
4648
log.error(e.getMessage(), e);
4749
if (e.getMessagingErrorCode() == MessagingErrorCode.UNREGISTERED || e.getMessagingErrorCode() == MessagingErrorCode.INVALID_ARGUMENT) {
48-
// Todo 토큰 삭제 방식 변경 필요
49-
pushSubscriptionRepository.deleteByToken(token);
50+
return PushSendResult.invalidTokenResult();
5051
}
52+
return PushSendResult.failedResult();
5153
}
5254
}
5355

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package me.pinitnotification.infrastructure.persistence.notification;
2+
3+
import me.pinitnotification.application.notification.NotificationDispatchItem;
4+
import me.pinitnotification.application.notification.NotificationDispatchQueryRepository;
5+
import me.pinitnotification.domain.notification.UpcomingScheduleNotification;
6+
import org.springframework.jdbc.core.simple.JdbcClient;
7+
import org.springframework.stereotype.Repository;
8+
9+
import java.time.Instant;
10+
import java.util.*;
11+
12+
@Repository
13+
public class NotificationDispatchQueryRepositoryAdapter implements NotificationDispatchQueryRepository {
14+
private static final String FIND_DUE_WITH_TOKENS_SQL = """
15+
SELECT n.public_id,
16+
n.owner_id,
17+
n.schedule_id,
18+
n.schedule_title,
19+
n.schedule_start_time,
20+
n.idempotent_key,
21+
ps.token
22+
FROM upcoming_schedule_notification n
23+
LEFT JOIN push_subscription ps ON ps.member_id = n.owner_id
24+
WHERE n.schedule_start_time IS NOT NULL
25+
AND n.schedule_start_time <= ?
26+
""";
27+
28+
private final JdbcClient jdbcClient;
29+
30+
public NotificationDispatchQueryRepositoryAdapter(JdbcClient jdbcClient) {
31+
this.jdbcClient = jdbcClient;
32+
}
33+
34+
@Override
35+
public List<NotificationDispatchItem> findAllDueNotificationsWithTokens(Instant now) {
36+
List<DispatchRow> rows = jdbcClient.sql(FIND_DUE_WITH_TOKENS_SQL)
37+
.param(now.toString())
38+
.query((rs, rowNum) -> new DispatchRow(
39+
UUID.fromString(rs.getString("public_id")),
40+
rs.getLong("owner_id"),
41+
rs.getLong("schedule_id"),
42+
rs.getString("schedule_title"),
43+
rs.getString("schedule_start_time"),
44+
rs.getString("idempotent_key"),
45+
rs.getString("token")
46+
))
47+
.list();
48+
49+
if (rows.isEmpty()) {
50+
return List.of();
51+
}
52+
53+
Map<UUID, DispatchAccumulator> aggregated = new LinkedHashMap<>();
54+
for (DispatchRow row : rows) {
55+
DispatchAccumulator accumulator = aggregated.computeIfAbsent(
56+
row.notificationId,
57+
id -> new DispatchAccumulator(
58+
toDomain(row),
59+
new ArrayList<>()
60+
)
61+
);
62+
if (row.token != null) {
63+
accumulator.tokens.add(row.token);
64+
}
65+
}
66+
67+
return aggregated.values().stream()
68+
.map(accumulator -> new NotificationDispatchItem(accumulator.notification, List.copyOf(accumulator.tokens)))
69+
.toList();
70+
}
71+
72+
private UpcomingScheduleNotification toDomain(DispatchRow row) {
73+
return new UpcomingScheduleNotification(
74+
row.notificationId,
75+
row.ownerId,
76+
row.scheduleId,
77+
row.scheduleTitle,
78+
row.scheduleStartTime,
79+
row.idempotentKey
80+
);
81+
}
82+
83+
private record DispatchRow(
84+
UUID notificationId,
85+
Long ownerId,
86+
Long scheduleId,
87+
String scheduleTitle,
88+
String scheduleStartTime,
89+
String idempotentKey,
90+
String token
91+
) {
92+
}
93+
94+
private static class DispatchAccumulator {
95+
private final UpcomingScheduleNotification notification;
96+
private final List<String> tokens;
97+
98+
private DispatchAccumulator(UpcomingScheduleNotification notification, List<String> tokens) {
99+
this.notification = notification;
100+
this.tokens = tokens;
101+
}
102+
}
103+
}

src/main/java/me/pinitnotification/infrastructure/persistence/notification/UpcomingScheduleNotificationEntity.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
uniqueConstraints = {
1616
@UniqueConstraint(name = "uk_schedule_owner", columnNames = {"schedule_id", "owner_id"}),
1717
@UniqueConstraint(name = "uk_idempotent_key", columnNames = {"idempotent_key"})
18+
},
19+
indexes = {
20+
@Index(name = "idx_upcoming_start_time", columnList = "schedule_start_time")
1821
}
1922
)
2023
@Getter

0 commit comments

Comments
 (0)