diff --git a/src/main/java/com/newzet/api/article/repository/ArticleRepositoryImpl.java b/src/main/java/com/newzet/api/article/repository/ArticleRepositoryImpl.java index 6cdb97eb..28931301 100644 --- a/src/main/java/com/newzet/api/article/repository/ArticleRepositoryImpl.java +++ b/src/main/java/com/newzet/api/article/repository/ArticleRepositoryImpl.java @@ -52,11 +52,7 @@ public List saveAll(List articleEntityDtoLis int processedArticles = 0; for (ArticleEntity entityToSave : entitiesToSave) { - try { - entityManager.persist(entityToSave); - } catch (Exception e) { - log.error("Failed to persist ArticleEntity: {}", entityToSave, e); - } + entityManager.persist(entityToSave); result.add(entityToSave.toEntityDto()); processedArticles++; if (processedArticles % BATCH_SIZE == 0) { diff --git a/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java b/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java index 7a2976a8..2f4ecd4e 100644 --- a/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java +++ b/src/main/java/com/newzet/api/article/repository/batch/ArticleRedisBatchConsumerImpl.java @@ -5,7 +5,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.RedisTemplate; @@ -15,7 +15,6 @@ import com.newzet.api.article.business.dto.ArticleEntityDto; import com.newzet.api.article.business.repository.ArticleRepository; import com.newzet.api.article.domain.Article; -import com.newzet.api.article.repository.batch.dto.BatchProcessingResult; import com.newzet.api.article.repository.batch.dto.BatchSaveData; import com.newzet.api.common.batch.RedisBatchConsumer; import com.newzet.api.common.batch.config.BatchConfig; @@ -34,6 +33,7 @@ public class ArticleRedisBatchConsumerImpl extends RedisBatchConsumer
private static final String CONSUMER_NAME = "article-processor"; private static final String ARTICLE_DUPLICATE_CACHE_PREFIX = "article:dup:"; private static final long DUPLICATE_CACHE_TTL_MINUTES = 10; + private static final int REDIS_BATCH_SIZE = 50; private final ArticleRepository articleRepository; private final FcmSenderOrchestrator fcmSenderOrchestrator; @@ -77,154 +77,182 @@ protected Class
getItemClass() { @Override protected void processBatchItems(List
articles) { long startTime = System.currentTimeMillis(); - BatchProcessingResult result = new BatchProcessingResult(); - Map> keyToArticlesMap = prepareArticlesWithCacheKeys( - articles, result); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger duplicateCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + AtomicInteger cacheHitCount = new AtomicInteger(0); - if (keyToArticlesMap.isEmpty()) { - log.info("No articles to process"); + Map uniqueArticlesMap = removeBatchDuplicates(articles, + duplicateCount, failCount); + + if (uniqueArticlesMap.isEmpty()) { + log.info("No articles to process after batch deduplication"); return; } - BatchSaveData saveData = identifyUniqueArticles(keyToArticlesMap, result); + BatchSaveData saveData = identifyUniqueArticlesWithBatch(uniqueArticlesMap, + duplicateCount, cacheHitCount, failCount); + + List savedArticles = saveToDatabaseOptimized(saveData.toSave(), + successCount, failCount); - saveToDatabaseAndUpdateCounters(saveData.toSave(), result); + updateRedisCacheAsync(saveData.toCache()); - updateRedisCache(saveData.toCache()); + sendFCMAsync(savedArticles); long duration = System.currentTimeMillis() - startTime; - logBatchSummary(articles.size(), result, duration); + logBatchSummary(articles.size(), successCount.get(), duplicateCount.get(), + cacheHitCount.get(), failCount.get(), duration); } - private Map> prepareArticlesWithCacheKeys( - List
articles, BatchProcessingResult result) { - Map> keyToArticlesMap = new HashMap<>(); + private Map removeBatchDuplicates(List
articles, + AtomicInteger duplicateCount, AtomicInteger failCount) { + + Map uniqueArticlesMap = new HashMap<>(); for (Article article : articles) { try { ArticleEntityDto entityDto = ArticleEntityDto.fromDomain(article); + String cacheKey = generateOptimizedCacheKey(entityDto); - String cacheKey = generateSimpleCacheKey( - entityDto.getFromName(), - entityDto.getFromDomain(), - entityDto.getTitle(), - entityDto.getToUserId() - ); + if (uniqueArticlesMap.containsKey(cacheKey)) { + duplicateCount.incrementAndGet(); + } else { + uniqueArticlesMap.put(cacheKey, entityDto); + } - keyToArticlesMap.computeIfAbsent(cacheKey, k -> new ArrayList<>()) - .add(entityDto); } catch (Exception e) { - result.incrementFailCount(); - log.error("Failed to process article: {}, error: {}", article.getTitle(), - e.getMessage(), e); + failCount.incrementAndGet(); + log.error("Failed to process article: {}", article.getTitle(), e); } } - return keyToArticlesMap; + return uniqueArticlesMap; } - private BatchSaveData identifyUniqueArticles( - Map> keyToArticlesMap, BatchProcessingResult result) { + private BatchSaveData identifyUniqueArticlesWithBatch( + Map uniqueArticlesMap, + AtomicInteger duplicateCount, AtomicInteger cacheHitCount, AtomicInteger failCount) { + List toSave = new ArrayList<>(); Map toCache = new HashMap<>(); - for (Map.Entry> entry : keyToArticlesMap.entrySet()) { - String cacheKey = entry.getKey(); - List entitiesWithSameKey = entry.getValue(); - - if (entitiesWithSameKey.size() > 1) { - int batchDuplicates = entitiesWithSameKey.size() - 1; - result.incrementBatchDuplicateCount(batchDuplicates); - log.info("BATCH DUPLICATES: {} articles with same key '{}'", - entitiesWithSameKey.size(), cacheKey); - } - - ArticleEntityDto entityDto = entitiesWithSameKey.get(0); + List cacheKeys = new ArrayList<>(uniqueArticlesMap.keySet()); - String cachedValue = redisTemplate.opsForValue().get(cacheKey); + for (int i = 0; i < cacheKeys.size(); i += REDIS_BATCH_SIZE) { + int endIndex = Math.min(i + REDIS_BATCH_SIZE, cacheKeys.size()); + List batchKeys = cacheKeys.subList(i, endIndex); - if (cachedValue != null) { - result.incrementDuplicateCount(); - result.incrementCacheHitCount(); - log.info( - "REDIS DUPLICATE: '{}' with key '{}', counters: duplicate={}, cacheHit={}", - entityDto.getTitle(), cacheKey, result.getDuplicateCount(), - result.getCacheHitCount()); - } else { - toSave.add(entityDto); - markForCaching(toCache, cacheKey); + try { + List cachedValues = redisTemplate.opsForValue().multiGet(batchKeys); + + for (int j = 0; j < batchKeys.size(); j++) { + String cacheKey = batchKeys.get(j); + String cachedValue = cachedValues.get(j); + ArticleEntityDto entityDto = uniqueArticlesMap.get(cacheKey); + + if (cachedValue != null) { + duplicateCount.incrementAndGet(); + cacheHitCount.incrementAndGet(); + } else { + toSave.add(entityDto); + toCache.put(cacheKey, "1"); + } + } + } catch (Exception e) { + log.error("Redis batch query failed: {}", e.getMessage()); + failCount.addAndGet(batchKeys.size()); + // Redis 실패 시 모든 아티클을 저장 대상으로 (안전장치) + for (String key : batchKeys) { + toSave.add(uniqueArticlesMap.get(key)); + toCache.put(key, "1"); + } } } - result.addToDuplicateCount(result.getBatchDuplicateCount()); - return new BatchSaveData(toSave, toCache); } - private void saveToDatabaseAndUpdateCounters(List toSave, - BatchProcessingResult result) { - if (!toSave.isEmpty()) { - try { - List saved = articleRepository.saveAll(toSave); - result.setSuccessCount(saved.size()); - sendFCM(saved); // fcm 메시지 전송 배치처리 - } catch (Exception e) { - result.incrementFailCount(toSave.size()); - log.error("SAVE FAILED for {} articles: {}", toSave.size(), e.getMessage(), e); - } - } else { - log.info("No new articles to save (all are duplicates or failed)"); + private List saveToDatabaseOptimized(List toSave, + AtomicInteger successCount, AtomicInteger failCount) { + + if (toSave.isEmpty()) { + return List.of(); } + + List saved = articleRepository.saveAll(toSave); + successCount.addAndGet(saved.size()); + log.info("Successfully saved {} articles to database", saved.size()); + return saved; } - private void sendFCM(List saved) { - if (!saved.isEmpty()) { - for (ArticleEntityDto articleData : saved) { - fcmSenderOrchestrator.sendFcmWhenMailReceivedBatch(articleData.getToUserId(), - articleData.getId(), articleData.getCreatedAt(), - articleData.getTitle(), articleData.getFromName()); - } + private void updateRedisCacheAsync(Map toCache) { + if (toCache.isEmpty()) { + return; } - } - private void updateRedisCache(Map toCache) { - if (!toCache.isEmpty()) { - for (Map.Entry entry : toCache.entrySet()) { - try { - String key = entry.getKey(); - redisTemplate.opsForValue().setIfAbsent( - key, entry.getValue(), Duration.ofMinutes(DUPLICATE_CACHE_TTL_MINUTES)); - } catch (Exception e) { - log.error("Cache update ERROR for key '{}': {}", entry.getKey(), - e.getMessage()); + reactiveRedisTemplate.opsForValue() + .multiSet(toCache) + .doOnSuccess(result -> { + if (Boolean.TRUE.equals(result)) { + log.debug("Cache updated for {} keys", toCache.size()); } + }) + .doOnError(error -> log.error("Cache update failed: {}", error.getMessage())) + .subscribe(); + + for (String key : toCache.keySet()) { + reactiveRedisTemplate.expire(key, Duration.ofMinutes(DUPLICATE_CACHE_TTL_MINUTES)) + .doOnError( + error -> log.warn("TTL setting failed for key {}: {}", key, error.getMessage())) + .subscribe(); + } + } + + private void sendFCMAsync(List savedArticles) { + if (savedArticles.isEmpty()) { + return; + } + + for (ArticleEntityDto articleData : savedArticles) { + try { + fcmSenderOrchestrator.sendFcmWhenMailReceivedBatch( + articleData.getToUserId(), + articleData.getId(), + articleData.getCreatedAt(), + articleData.getTitle(), + articleData.getFromName() + ); + } catch (Exception e) { + log.error("FCM send failed for article {}: {}", articleData.getId(), + e.getMessage()); } } } - private void logBatchSummary(int totalArticles, BatchProcessingResult result, long duration) { - log.info( - "ARTICLE BATCH SUMMARY: total={}, success={}, duplicate={}, cacheHit={}, failed={}, elapsed={}ms", - totalArticles, result.getSuccessCount(), result.getDuplicateCount(), - result.getCacheHitCount(), result.getFailCount(), duration); + private String generateOptimizedCacheKey(ArticleEntityDto entityDto) { + String fromName = normalizeString(entityDto.getFromName()); + String fromDomain = normalizeString(entityDto.getFromDomain()); + String title = normalizeString(entityDto.getTitle()); + String userId = entityDto.getToUserId().toString().substring(0, 8); + + return ARTICLE_DUPLICATE_CACHE_PREFIX + + fromName + ":" + + fromDomain + ":" + + userId + ":" + + Math.abs(title.hashCode()); } - private void markForCaching(Map cacheMap, String cacheKey) { - cacheMap.put(cacheKey, "1"); + private String normalizeString(String input) { + return input != null ? input.trim().toLowerCase() : ""; } - private String generateSimpleCacheKey(String fromName, String fromDomain, String title, - UUID toUserId) { - fromName = fromName != null ? fromName.trim().toLowerCase() : ""; - fromDomain = fromDomain != null ? fromDomain.trim().toLowerCase() : ""; - String normalizedTitle = title != null ? title.trim().toLowerCase() : ""; - String userIdStr = toUserId != null ? toUserId.toString() : "null"; + private void logBatchSummary(int totalArticles, int successCount, int duplicateCount, + int cacheHitCount, int failCount, long duration) { - return ARTICLE_DUPLICATE_CACHE_PREFIX + - fromName + "_" + - fromDomain + "_" + - userIdStr.substring(0, Math.min(8, userIdStr.length())) + "_" + - Math.abs(normalizedTitle.hashCode()); + log.info( + "ARTICLE BATCH: total={}, success={}, duplicate={}, cacheHit={}, failed={}, elapsed={}ms", + totalArticles, successCount, duplicateCount, cacheHitCount, failCount, duration); } } diff --git a/src/main/java/com/newzet/api/article/repository/batch/dto/BatchProcessingResult.java b/src/main/java/com/newzet/api/article/repository/batch/dto/BatchProcessingResult.java deleted file mode 100644 index 332e499a..00000000 --- a/src/main/java/com/newzet/api/article/repository/batch/dto/BatchProcessingResult.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.newzet.api.article.repository.batch.dto; - -import lombok.Data; - -@Data -public class BatchProcessingResult { - private int successCount; - private int failCount; - private int duplicateCount; - private int cacheHitCount; - private int batchDuplicateCount; - - public void incrementFailCount() { - this.failCount++; - } - - public void incrementFailCount(int count) { - this.failCount += count; - } - - public void incrementDuplicateCount() { - this.duplicateCount++; - } - - public void addToDuplicateCount(int count) { - this.duplicateCount += count; - } - - public void incrementCacheHitCount() { - this.cacheHitCount++; - } - - public void incrementBatchDuplicateCount(int count) { - this.batchDuplicateCount += count; - } -} diff --git a/src/main/java/com/newzet/api/common/batch/RedisBatchConsumer.java b/src/main/java/com/newzet/api/common/batch/RedisBatchConsumer.java index a99f6bc7..117578c9 100644 --- a/src/main/java/com/newzet/api/common/batch/RedisBatchConsumer.java +++ b/src/main/java/com/newzet/api/common/batch/RedisBatchConsumer.java @@ -15,6 +15,7 @@ import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.connection.stream.StreamReadOptions; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.stream.StreamReceiver; @@ -80,7 +81,14 @@ public void startProcessing() { if (isProcessing.compareAndSet(false, true)) { executorService = Executors.newSingleThreadExecutor(); ackExecutorService = Executors.newSingleThreadExecutor(); - executorService.submit(this::processBatchesAsync); + executorService.submit(() -> { + log.info("{} consumer starting... Draining pending messages first.", + getProcessorTypeName()); + drainPendingMessages(); + log.info("{} pending messages drained. Starting to listen for new messages.", + getProcessorTypeName()); + processBatchesAsync(); + }); log.info("{} batch processor started with configuration: size={}, timeout={}s", getProcessorTypeName(), batchConfig.getBatchSize(), batchConfig.getTimeoutSeconds()); @@ -102,8 +110,44 @@ public void stopProcessing() { } } + private void drainPendingMessages() { + try { + Consumer consumer = Consumer.from(getConsumerGroup(), getConsumerName()); + StreamOffset offset = StreamOffset.create(getStreamKey(), + ReadOffset.from("0-0")); + StreamReadOptions readOptions = StreamReadOptions.empty() + .count(batchConfig.getBatchSize()); + + while (isProcessing.get()) { + List> rawRecords = redisTemplate.opsForStream() + .read(consumer, readOptions, offset); + + if (rawRecords == null || rawRecords.isEmpty()) { + break; + } + + List> records = new ArrayList<>(); + for (MapRecord rawRecord : rawRecords) { + Map stringMap = new HashMap<>(); + for (Map.Entry entry : rawRecord.getValue().entrySet()) { + stringMap.put(String.valueOf(entry.getKey()), + String.valueOf(entry.getValue())); + } + records.add(MapRecord.create(rawRecord.getStream(), stringMap) + .withId(rawRecord.getId())); + } + + log.info("Processing {} pending messages from drain task.", records.size()); + processBatchWithAck(records); + } + } catch (Exception e) { + log.error("Error draining pending messages for {}: {}", getProcessorTypeName(), + e.getMessage(), e); + } + } + protected void processBatchesAsync() { - log.info("{} batch processing thread initialized", getProcessorTypeName()); + log.info("{} batch processing thread initialized for new messages", getProcessorTypeName()); try { StreamReceiver.StreamReceiverOptions> options = @@ -163,33 +207,36 @@ protected void processBatchWithAck(List> recor } } - processBatchItems(items); - - CompletableFuture.runAsync(() -> { - List ackedMessageIds = new ArrayList<>(); - - for (MapRecord record : records) { - String messageId = record.getId().getValue(); - try { - redisTemplate.opsForStream() - .acknowledge(getStreamKey(), getConsumerGroup(), messageId); - ackedMessageIds.add(messageId); - } catch (Exception e) { - log.warn("Ack failed for {} message {}, skipping for now: {}", - getProcessorTypeName(), messageId, e.getMessage()); + try { + processBatchItems(items); + CompletableFuture.runAsync(() -> { + List ackedMessageIds = new ArrayList<>(); + for (MapRecord record : records) { + try { + redisTemplate.opsForStream() + .acknowledge(getStreamKey(), getConsumerGroup(), record.getId()); + ackedMessageIds.add(record.getId().getValue()); + } catch (Exception e) { + log.warn("Ack failed for {} message {}, skipping for now: {}", + getProcessorTypeName(), record.getId(), e.getMessage()); + } } - } - if (!ackedMessageIds.isEmpty()) { - try { - redisTemplate.opsForStream() - .delete(getStreamKey(), ackedMessageIds.toArray(new String[0])); - } catch (Exception e) { - log.error("Failed to delete acked {} messages: {}", getProcessorTypeName(), - e.getMessage(), e); + if (!ackedMessageIds.isEmpty()) { + try { + redisTemplate.opsForStream() + .delete(getStreamKey(), ackedMessageIds.toArray(new String[0])); + } catch (Exception e) { + log.error("Failed to delete acked {} messages: {}", getProcessorTypeName(), + e.getMessage(), e); + } } - } - }, ackExecutorService); + }, ackExecutorService); + } catch (Exception e) { + log.error( + "{} batch processing failed. Messages will not be acknowledged and will be retried.", + getProcessorTypeName(), e); + } } @PreDestroy diff --git a/src/main/java/com/newzet/api/config/cache/RedisConfig.java b/src/main/java/com/newzet/api/config/cache/RedisConfig.java index 33010e62..ca1d9e41 100644 --- a/src/main/java/com/newzet/api/config/cache/RedisConfig.java +++ b/src/main/java/com/newzet/api/config/cache/RedisConfig.java @@ -16,6 +16,8 @@ import io.lettuce.core.ClientOptions; import io.lettuce.core.SocketOptions; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.DefaultClientResources; @Configuration public class RedisConfig { @@ -35,18 +37,31 @@ public LettuceConnectionFactory redisConnectionFactory() { serverConfig.setPassword(RedisPassword.of(password)); LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder() - .commandTimeout(Duration.ofMillis(500)) + .commandTimeout(Duration.ofMillis(1000)) .clientOptions(ClientOptions.builder() .autoReconnect(true) .socketOptions( - SocketOptions.builder().connectTimeout(Duration.ofMillis(1000)).build()) + SocketOptions.builder() + .connectTimeout(Duration.ofMillis(1000)) + .keepAlive(true) + .tcpNoDelay(true) + .build()) .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) .build()) + .clientResources(clientResources()) .build(); return new LettuceConnectionFactory(serverConfig, clientConfig); } + @Bean(destroyMethod = "shutdown") + public ClientResources clientResources() { + return DefaultClientResources.builder() + .ioThreadPoolSize(8) + .computationThreadPoolSize(8) + .build(); + } + @Bean public RedisTemplate redisTemplate() { RedisTemplate redisTemplate = new RedisTemplate<>(); diff --git a/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java b/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java index 48fe22fe..a6fd3b06 100644 --- a/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java +++ b/src/main/java/com/newzet/api/fcm/jpa/batch/FcmRedisBatchConsumerImpl.java @@ -1,6 +1,11 @@ package com.newzet.api.fcm.jpa.batch; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.RedisTemplate; @@ -11,7 +16,6 @@ import com.newzet.api.common.objectMapper.OptionalObjectMapper; import com.newzet.api.fcm.business.batch.FcmBatchConsumer; import com.newzet.api.fcm.domain.FcmNotification; -import com.newzet.api.fcm.jpa.batch.dto.FcmBatchProcessingResult; import com.newzet.api.fcm.orchestrator.FcmSenderOrchestrator; import lombok.extern.slf4j.Slf4j; @@ -24,8 +28,11 @@ public class FcmRedisBatchConsumerImpl extends RedisBatchConsumer redisTemplate, ReactiveRedisTemplate reactiveRedisTemplate, @@ -34,6 +41,13 @@ public FcmRedisBatchConsumerImpl(RedisTemplate redisTemplate, FcmSenderOrchestrator fcmSenderOrchestrator) { super(redisTemplate, reactiveRedisTemplate, batchConfig, optionalObjectMapper); this.fcmSenderOrchestrator = fcmSenderOrchestrator; + + this.fcmExecutor = Executors.newFixedThreadPool(PARALLEL_THREADS, r -> { + Thread t = new Thread(r, "fcm-turbo-" + System.nanoTime()); + t.setDaemon(true); + t.setPriority(Thread.NORM_PRIORITY + 1); + return t; + }); } @Override @@ -64,38 +78,98 @@ protected Class getItemClass() { @Override protected void processBatchItems(List fcmNotifications) { long startTime = System.currentTimeMillis(); - FcmBatchProcessingResult result = new FcmBatchProcessingResult(); - for (FcmNotification fcmNotification : fcmNotifications) { - processSingleNotification(fcmNotification, result); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + AtomicInteger invalidTokenCount = new AtomicInteger(0); + + try { + log.info("Processing FCM batch: {} notifications", fcmNotifications.size()); + + processInParallelTurbo(fcmNotifications, successCount, failCount, invalidTokenCount); + + long duration = System.currentTimeMillis() - startTime; + logBatchSummary(fcmNotifications.size(), successCount.get(), failCount.get(), + invalidTokenCount.get(), duration); + + } catch (Exception e) { + log.error("FCM batch processing failed: {}", e.getMessage(), e); + failCount.addAndGet(fcmNotifications.size()); } + } + + private void processInParallelTurbo(List fcmNotifications, + AtomicInteger successCount, AtomicInteger failCount, AtomicInteger invalidTokenCount) { + + try { + CompletableFuture[] futures = fcmNotifications.stream() + .map(notification -> CompletableFuture.runAsync(() -> { + processSingleNotificationFast(notification, successCount, failCount, + invalidTokenCount); + }, fcmExecutor) + .orTimeout(5, TimeUnit.SECONDS)) + .toArray(CompletableFuture[]::new); - long duration = System.currentTimeMillis() - startTime; - logBatchSummary(fcmNotifications.size(), result, duration); + CompletableFuture.allOf(futures) + .get(BATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + } catch (Exception e) { + log.error("FCM processing failed: {}", e.getMessage()); + int totalProcessed = successCount.get() + failCount.get() + invalidTokenCount.get(); + int remainingFails = fcmNotifications.size() - totalProcessed; + if (remainingFails > 0) { + failCount.addAndGet(remainingFails); + log.warn("FCM batch incomplete: {} remaining marked as failed", remainingFails); + } + } } - private void processSingleNotification(FcmNotification fcmNotification, - FcmBatchProcessingResult result) { + private void processSingleNotificationFast(FcmNotification fcmNotification, + AtomicInteger successCount, AtomicInteger failCount, AtomicInteger invalidTokenCount) { + if (!fcmNotification.isValid()) { - result.incrementInvalidTokenCount(); log.warn("Invalid FCM notification: userId={}, token={}", - fcmNotification.getUserId(), fcmNotification.getToken()); + fcmNotification.getUserId(), fcmNotification.getToken()); + invalidTokenCount.incrementAndGet(); return; } try { fcmSenderOrchestrator.send(fcmNotification); - result.incrementSuccessCount(); + successCount.incrementAndGet(); + } catch (Exception e) { - result.incrementFailCount(); + failCount.incrementAndGet(); + log.trace("FCM send failed for userId {}: {}", + fcmNotification.getUserId(), e.getMessage()); } } - private void logBatchSummary(int totalNotifications, FcmBatchProcessingResult result, - long duration) { + private void logBatchSummary(int totalNotifications, int successCount, int failCount, + int invalidTokenCount, long duration) { + log.info( - "FCM BATCH SUMMARY: total={}, success={}, failed={}, invalidToken={}, elapsed={}ms", - totalNotifications, result.getSuccessCount(), result.getFailCount(), - result.getInvalidTokenCount(), duration); + "FCM BATCH: total={}, success={}, failed={}, invalid={}, elapsed={}ms", + totalNotifications, successCount, failCount, invalidTokenCount, duration); + } + + @Override + public void onShutdown() { + super.onShutdown(); + if (fcmExecutor != null && !fcmExecutor.isShutdown()) { + log.info("Shutting down FCM executor with {} threads", PARALLEL_THREADS); + fcmExecutor.shutdown(); + try { + if (!fcmExecutor.awaitTermination(3, TimeUnit.SECONDS)) { + fcmExecutor.shutdownNow(); + if (!fcmExecutor.awaitTermination(2, TimeUnit.SECONDS)) { + log.warn("FCM executor did not terminate gracefully"); + } + } + } catch (InterruptedException e) { + fcmExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } } diff --git a/src/test/java/com/newzet/api/article/repository/ArticleRepositoryTest.java b/src/test/java/com/newzet/api/article/repository/ArticleRepositoryTest.java index 6d199b51..1748cb98 100644 --- a/src/test/java/com/newzet/api/article/repository/ArticleRepositoryTest.java +++ b/src/test/java/com/newzet/api/article/repository/ArticleRepositoryTest.java @@ -146,21 +146,6 @@ void saveAll_WhenEmptyList_ThenReturnEmptyList() { verify(entityManager, never()).clear(); } - @Test - void saveAll_WhenExceptionOccurs_ThenShouldNotPropagateException() { - // Given - List dtos = createMockArticleDtos(3); - - doThrow(new RuntimeException("Test exception")).when(entityManager) - .persist(any(ArticleEntity.class)); - - // When - List result = articleRepository.saveAll(dtos); - - // Then - assertThat(result).hasSize(3); - } - @Test void saveAll_WhenProcessingNullFields_ThenHandleGracefully() { // Given @@ -183,26 +168,6 @@ void saveAll_WhenProcessingNullFields_ThenHandleGracefully() { assertThat(capturedEntity.getFromDomain()).isNull(); } - @Test - void saveAll_WhenPersistThrowsException_ShouldContinueSavingOthers() { - // Given - List dtos = createMockArticleDtos(3); - - doThrow(new RuntimeException("persist fail")) - .doNothing() - .doNothing() - .when(entityManager).persist(any(ArticleEntity.class)); - - // When - List result = articleRepository.saveAll(dtos); - - // Then - assertThat(result).hasSize(3); - verify(entityManager, times(3)).persist(any(ArticleEntity.class)); - verify(entityManager, atLeast(1)).flush(); - verify(entityManager, atLeast(1)).clear(); - } - private List createMockArticleDtos(int count) { return IntStream.range(0, count) .mapToObj(i -> Article.createNewArticle(