Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,7 @@ public List<ArticleEntityDto> saveAll(List<ArticleEntityDto> articleEntityDtoLis
int processedArticles = 0;

for (ArticleEntity entityToSave : entitiesToSave) {
try {
entityManager.persist(entityToSave);
} catch (Exception e) {
log.error("Failed to persist ArticleEntity: {}", entityToSave, e);
}
entityManager.persist(entityToSave);
result.add(entityToSave.toEntityDto());
processedArticles++;
if (processedArticles % BATCH_SIZE == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
Expand All @@ -15,7 +15,6 @@
import com.newzet.api.article.business.dto.ArticleEntityDto;
import com.newzet.api.article.business.repository.ArticleRepository;
import com.newzet.api.article.domain.Article;
import com.newzet.api.article.repository.batch.dto.BatchProcessingResult;
import com.newzet.api.article.repository.batch.dto.BatchSaveData;
import com.newzet.api.common.batch.RedisBatchConsumer;
import com.newzet.api.common.batch.config.BatchConfig;
Expand All @@ -34,6 +33,7 @@ public class ArticleRedisBatchConsumerImpl extends RedisBatchConsumer<Article>
private static final String CONSUMER_NAME = "article-processor";
private static final String ARTICLE_DUPLICATE_CACHE_PREFIX = "article:dup:";
private static final long DUPLICATE_CACHE_TTL_MINUTES = 10;
private static final int REDIS_BATCH_SIZE = 50;

private final ArticleRepository articleRepository;
private final FcmSenderOrchestrator fcmSenderOrchestrator;
Expand Down Expand Up @@ -77,154 +77,182 @@ protected Class<Article> getItemClass() {
@Override
protected void processBatchItems(List<Article> articles) {
long startTime = System.currentTimeMillis();
BatchProcessingResult result = new BatchProcessingResult();

Map<String, List<ArticleEntityDto>> keyToArticlesMap = prepareArticlesWithCacheKeys(
articles, result);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger duplicateCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
AtomicInteger cacheHitCount = new AtomicInteger(0);

if (keyToArticlesMap.isEmpty()) {
log.info("No articles to process");
Map<String, ArticleEntityDto> uniqueArticlesMap = removeBatchDuplicates(articles,
duplicateCount, failCount);

if (uniqueArticlesMap.isEmpty()) {
log.info("No articles to process after batch deduplication");
return;
}

BatchSaveData saveData = identifyUniqueArticles(keyToArticlesMap, result);
BatchSaveData saveData = identifyUniqueArticlesWithBatch(uniqueArticlesMap,
duplicateCount, cacheHitCount, failCount);

List<ArticleEntityDto> savedArticles = saveToDatabaseOptimized(saveData.toSave(),
successCount, failCount);

saveToDatabaseAndUpdateCounters(saveData.toSave(), result);
updateRedisCacheAsync(saveData.toCache());

updateRedisCache(saveData.toCache());
sendFCMAsync(savedArticles);

long duration = System.currentTimeMillis() - startTime;
logBatchSummary(articles.size(), result, duration);
logBatchSummary(articles.size(), successCount.get(), duplicateCount.get(),
cacheHitCount.get(), failCount.get(), duration);
}

private Map<String, List<ArticleEntityDto>> prepareArticlesWithCacheKeys(
List<Article> articles, BatchProcessingResult result) {
Map<String, List<ArticleEntityDto>> keyToArticlesMap = new HashMap<>();
private Map<String, ArticleEntityDto> removeBatchDuplicates(List<Article> articles,
AtomicInteger duplicateCount, AtomicInteger failCount) {

Map<String, ArticleEntityDto> uniqueArticlesMap = new HashMap<>();

for (Article article : articles) {
try {
ArticleEntityDto entityDto = ArticleEntityDto.fromDomain(article);
String cacheKey = generateOptimizedCacheKey(entityDto);

String cacheKey = generateSimpleCacheKey(
entityDto.getFromName(),
entityDto.getFromDomain(),
entityDto.getTitle(),
entityDto.getToUserId()
);
if (uniqueArticlesMap.containsKey(cacheKey)) {
duplicateCount.incrementAndGet();
} else {
uniqueArticlesMap.put(cacheKey, entityDto);
}

keyToArticlesMap.computeIfAbsent(cacheKey, k -> new ArrayList<>())
.add(entityDto);
} catch (Exception e) {
result.incrementFailCount();
log.error("Failed to process article: {}, error: {}", article.getTitle(),
e.getMessage(), e);
failCount.incrementAndGet();
log.error("Failed to process article: {}", article.getTitle(), e);
}
}

return keyToArticlesMap;
return uniqueArticlesMap;
}

private BatchSaveData identifyUniqueArticles(
Map<String, List<ArticleEntityDto>> keyToArticlesMap, BatchProcessingResult result) {
private BatchSaveData identifyUniqueArticlesWithBatch(
Map<String, ArticleEntityDto> uniqueArticlesMap,
AtomicInteger duplicateCount, AtomicInteger cacheHitCount, AtomicInteger failCount) {

List<ArticleEntityDto> toSave = new ArrayList<>();
Map<String, String> toCache = new HashMap<>();

for (Map.Entry<String, List<ArticleEntityDto>> entry : keyToArticlesMap.entrySet()) {
String cacheKey = entry.getKey();
List<ArticleEntityDto> entitiesWithSameKey = entry.getValue();

if (entitiesWithSameKey.size() > 1) {
int batchDuplicates = entitiesWithSameKey.size() - 1;
result.incrementBatchDuplicateCount(batchDuplicates);
log.info("BATCH DUPLICATES: {} articles with same key '{}'",
entitiesWithSameKey.size(), cacheKey);
}

ArticleEntityDto entityDto = entitiesWithSameKey.get(0);
List<String> cacheKeys = new ArrayList<>(uniqueArticlesMap.keySet());

String cachedValue = redisTemplate.opsForValue().get(cacheKey);
for (int i = 0; i < cacheKeys.size(); i += REDIS_BATCH_SIZE) {
int endIndex = Math.min(i + REDIS_BATCH_SIZE, cacheKeys.size());
List<String> batchKeys = cacheKeys.subList(i, endIndex);

if (cachedValue != null) {
result.incrementDuplicateCount();
result.incrementCacheHitCount();
log.info(
"REDIS DUPLICATE: '{}' with key '{}', counters: duplicate={}, cacheHit={}",
entityDto.getTitle(), cacheKey, result.getDuplicateCount(),
result.getCacheHitCount());
} else {
toSave.add(entityDto);
markForCaching(toCache, cacheKey);
try {
List<String> cachedValues = redisTemplate.opsForValue().multiGet(batchKeys);

for (int j = 0; j < batchKeys.size(); j++) {
String cacheKey = batchKeys.get(j);
String cachedValue = cachedValues.get(j);
ArticleEntityDto entityDto = uniqueArticlesMap.get(cacheKey);

if (cachedValue != null) {
duplicateCount.incrementAndGet();
cacheHitCount.incrementAndGet();
} else {
toSave.add(entityDto);
toCache.put(cacheKey, "1");
}
}
} catch (Exception e) {
log.error("Redis batch query failed: {}", e.getMessage());
failCount.addAndGet(batchKeys.size());
// Redis 실패 시 모든 아티클을 저장 대상으로 (안전장치)
for (String key : batchKeys) {
toSave.add(uniqueArticlesMap.get(key));
toCache.put(key, "1");
}
}
}

result.addToDuplicateCount(result.getBatchDuplicateCount());

return new BatchSaveData(toSave, toCache);
}

private void saveToDatabaseAndUpdateCounters(List<ArticleEntityDto> toSave,
BatchProcessingResult result) {
if (!toSave.isEmpty()) {
try {
List<ArticleEntityDto> saved = articleRepository.saveAll(toSave);
result.setSuccessCount(saved.size());
sendFCM(saved); // fcm 메시지 전송 배치처리
} catch (Exception e) {
result.incrementFailCount(toSave.size());
log.error("SAVE FAILED for {} articles: {}", toSave.size(), e.getMessage(), e);
}
} else {
log.info("No new articles to save (all are duplicates or failed)");
private List<ArticleEntityDto> saveToDatabaseOptimized(List<ArticleEntityDto> toSave,
AtomicInteger successCount, AtomicInteger failCount) {

if (toSave.isEmpty()) {
return List.of();
}

List<ArticleEntityDto> saved = articleRepository.saveAll(toSave);
successCount.addAndGet(saved.size());
log.info("Successfully saved {} articles to database", saved.size());
return saved;
}

private void sendFCM(List<ArticleEntityDto> saved) {
if (!saved.isEmpty()) {
for (ArticleEntityDto articleData : saved) {
fcmSenderOrchestrator.sendFcmWhenMailReceivedBatch(articleData.getToUserId(),
articleData.getId(), articleData.getCreatedAt(),
articleData.getTitle(), articleData.getFromName());
}
private void updateRedisCacheAsync(Map<String, String> toCache) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

캐시 업데이트를 비동기적으로 수행하도록 변경하였다고 하셨는데, 순차적으로 처리되는 다음 배치 작업 이전에는 캐시가 업데이트 되는건가요?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100프로 보장하지는 않습니다..! 비동기의 한계가 아닌가 싶네요 😢

if (toCache.isEmpty()) {
return;
}
}

private void updateRedisCache(Map<String, String> toCache) {
if (!toCache.isEmpty()) {
for (Map.Entry<String, String> entry : toCache.entrySet()) {
try {
String key = entry.getKey();
redisTemplate.opsForValue().setIfAbsent(
key, entry.getValue(), Duration.ofMinutes(DUPLICATE_CACHE_TTL_MINUTES));
} catch (Exception e) {
log.error("Cache update ERROR for key '{}': {}", entry.getKey(),
e.getMessage());
reactiveRedisTemplate.opsForValue()
.multiSet(toCache)
.doOnSuccess(result -> {
if (Boolean.TRUE.equals(result)) {
log.debug("Cache updated for {} keys", toCache.size());
}
})
.doOnError(error -> log.error("Cache update failed: {}", error.getMessage()))
.subscribe();

for (String key : toCache.keySet()) {
reactiveRedisTemplate.expire(key, Duration.ofMinutes(DUPLICATE_CACHE_TTL_MINUTES))
.doOnError(
error -> log.warn("TTL setting failed for key {}: {}", key, error.getMessage()))
.subscribe();
}
}

private void sendFCMAsync(List<ArticleEntityDto> savedArticles) {
if (savedArticles.isEmpty()) {
return;
}

for (ArticleEntityDto articleData : savedArticles) {
try {
fcmSenderOrchestrator.sendFcmWhenMailReceivedBatch(
articleData.getToUserId(),
articleData.getId(),
articleData.getCreatedAt(),
articleData.getTitle(),
articleData.getFromName()
);
} catch (Exception e) {
log.error("FCM send failed for article {}: {}", articleData.getId(),
e.getMessage());
}
}
}

private void logBatchSummary(int totalArticles, BatchProcessingResult result, long duration) {
log.info(
"ARTICLE BATCH SUMMARY: total={}, success={}, duplicate={}, cacheHit={}, failed={}, elapsed={}ms",
totalArticles, result.getSuccessCount(), result.getDuplicateCount(),
result.getCacheHitCount(), result.getFailCount(), duration);
private String generateOptimizedCacheKey(ArticleEntityDto entityDto) {
String fromName = normalizeString(entityDto.getFromName());
String fromDomain = normalizeString(entityDto.getFromDomain());
String title = normalizeString(entityDto.getTitle());
String userId = entityDto.getToUserId().toString().substring(0, 8);

return ARTICLE_DUPLICATE_CACHE_PREFIX +
fromName + ":" +
fromDomain + ":" +
userId + ":" +
Math.abs(title.hashCode());
}

private void markForCaching(Map<String, String> cacheMap, String cacheKey) {
cacheMap.put(cacheKey, "1");
private String normalizeString(String input) {
return input != null ? input.trim().toLowerCase() : "";
}

private String generateSimpleCacheKey(String fromName, String fromDomain, String title,
UUID toUserId) {
fromName = fromName != null ? fromName.trim().toLowerCase() : "";
fromDomain = fromDomain != null ? fromDomain.trim().toLowerCase() : "";
String normalizedTitle = title != null ? title.trim().toLowerCase() : "";
String userIdStr = toUserId != null ? toUserId.toString() : "null";
private void logBatchSummary(int totalArticles, int successCount, int duplicateCount,
int cacheHitCount, int failCount, long duration) {

return ARTICLE_DUPLICATE_CACHE_PREFIX +
fromName + "_" +
fromDomain + "_" +
userIdStr.substring(0, Math.min(8, userIdStr.length())) + "_" +
Math.abs(normalizedTitle.hashCode());
log.info(
"ARTICLE BATCH: total={}, success={}, duplicate={}, cacheHit={}, failed={}, elapsed={}ms",
totalArticles, successCount, duplicateCount, cacheHitCount, failCount, duration);
}
}

This file was deleted.

Loading
Loading