Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.example.cs25batch.adapter;

import jakarta.annotation.Nullable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.StringRedisTemplate;

@RequiredArgsConstructor
public class RedisStreamsClient {

private final StringRedisTemplate redisTemplate;
private final String stream;
private final String group;
private final String consumer;

@Nullable
public MapRecord<String, Object, Object> readWithConsumerGroup(Duration blockTimeout) {

StreamReadOptions options = StreamReadOptions.empty().count(1).block(blockTimeout);

List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream().read(
Consumer.from(group, consumer),
options,
StreamOffset.create(stream, ReadOffset.lastConsumed())
);

return (records == null || records.isEmpty()) ? null : records.get(0);
}

public void ack(String recordId) {
redisTemplate.opsForStream().acknowledge(stream, group, RecordId.of(recordId));
}

public void del(String recordId) {
redisTemplate.opsForStream().delete(stream, RecordId.of(recordId));
}

public void ackAndDel(String recordId) {
RecordId id = RecordId.of(recordId);
redisTemplate.opsForStream().acknowledge(stream, group, id);
redisTemplate.opsForStream().delete(stream, id);
}

public void addDlq(String dlqStream, Map<String, String> message){
redisTemplate.opsForStream().add(dlqStream, message);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.example.cs25batch.aop;

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.batch.dto.MailDto;
import com.example.cs25entity.domain.mail.entity.MailLog;
import com.example.cs25entity.domain.mail.enums.MailStatus;
Expand All @@ -26,7 +27,7 @@
public class MailLogAspect {

private final MailLogRepository mailLogRepository;
private final StringRedisTemplate redisTemplate;
private final RedisStreamsClient redisClient;

@Around("execution(* com.example.cs25batch.sender.context.MailSenderContext.send(..))")
public Object logMailSend(ProceedingJoinPoint joinPoint) throws Throwable {
Expand Down Expand Up @@ -66,7 +67,7 @@ public Object logMailSend(ProceedingJoinPoint joinPoint) throws Throwable {
"subscriptionId", subscription.getId().toString(),
"quizId", quiz.getId().toString()
);
redisTemplate.opsForStream().add("quiz-email-retry-stream", retryMessage);
redisClient.addDlq("quiz-email-retry-stream", retryMessage);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.example.cs25batch.batch.component.processor;

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.batch.dto.MailDto;
import com.example.cs25batch.batch.service.TodayQuizService;
import com.example.cs25entity.domain.quiz.entity.Quiz;
import com.example.cs25entity.domain.quiz.exception.QuizException;
import com.example.cs25entity.domain.subscription.entity.Subscription;
import com.example.cs25entity.domain.subscription.repository.SubscriptionRepository;
import java.util.Map;
Expand All @@ -18,6 +20,7 @@ public class MailConsumerAsyncProcessor implements ItemProcessor<Map<String, Str

private final SubscriptionRepository subscriptionRepository;
private final TodayQuizService todayQuizService;
private final RedisStreamsClient redisClient;

@Override
public MailDto process(Map<String, String> message) throws Exception {
Expand All @@ -37,14 +40,19 @@ public MailDto process(Map<String, String> message) throws Exception {
}

//Quiz 출제
Quiz quiz = todayQuizService.getTodayQuizBySubscription(subscription);
try {
Quiz quiz = todayQuizService.getTodayQuizBySubscription(subscription);
return MailDto.builder()
.subscription(subscription)
.quiz(quiz)
.recordId(recordId)
.build();
} catch(QuizException e){
//문제 출제 실패로 인한 예외 발생 시, 기존 Queue에 있는 데이터 삭제
redisClient.ackAndDel(recordId);
return null;
}
//long quizEnd = System.currentTimeMillis();
//log.info("[5. 문제 출제] QuizId : {} {}ms", quiz.getId(), quizEnd - quizStart);

return MailDto.builder()
.subscription(subscription)
.quiz(quiz)
.recordId(recordId)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,60 +1,49 @@
package com.example.cs25batch.batch.component.reader;

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.sender.context.MailSenderContext;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@RequiredArgsConstructor
@Component("redisConsumeReader")
public class RedisStreamReader implements ItemReader<Map<String, String>> {

private static final String STREAM = "quiz-email-stream";
private static final String GROUP = "mail-consumer-group";
private static final String CONSUMER = "mail-worker";

@Value("${mail.strategy:javaBatchMailSender}")
private String strategyKey;

private final StringRedisTemplate redisTemplate;
private final RedisStreamsClient redisClient;
private final MailSenderContext mailSenderContext;

@Override
public Map<String, String> read() throws InterruptedException {
//long start = System.currentTimeMillis();
Bucket bucket = mailSenderContext.getBucket(strategyKey);

while (!bucket.tryConsume(1)) {
/*
while (!mailSenderContext.tryConsume(strategyKey, 1L)) {
Thread.sleep(200); //토큰을 얻을 때까지 간격을 두고 재시도
}
*/
mailSenderContext.acquirePermitOrWait(strategyKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Reader에서 토큰을 선점하면 ‘메시지 없음’에도 토큰이 소모됩니다 — 토큰 획득 호출 제거 권장(Writer로 이동)

현재 acquirePermitOrWait는 토큰을 실제로 소비합니다(tryConsumeAndReturnRemaining 사용). 메시지가 없거나 빈 레코드일 때도 처리량을 깎는 문제가 생깁니다. 토큰 획득은 MailWriter에서 실제 발송 직전에 수행하세요.

-        mailSenderContext.acquirePermitOrWait(strategyKey);
+        // 토큰 획득은 MailWriter에서 실제 발송 직전에 수행

MailWriter 측 반영은 별도 코멘트의 diff를 참고하세요.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
mailSenderContext.acquirePermitOrWait(strategyKey);
// 토큰 획득은 MailWriter에서 실제 발송 직전에 수행
🤖 Prompt for AI Agents
In
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java
around line 41, remove the call to
mailSenderContext.acquirePermitOrWait(strategyKey) from the reader because it
consumes a token even when there are no messages (tryConsumeAndReturnRemaining
behavior); instead, ensure token acquisition is performed in MailWriter
immediately before sending each mail (per the provided MailWriter diff). Update
the reader to not call or check permits at all, and confirm MailWriter calls
acquirePermitOrWait(strategyKey) right before the actual send so tokens are only
consumed for real send attempts.


List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream().read(
Consumer.from(GROUP, CONSUMER),
StreamReadOptions.empty().count(1).block(Duration.ofMillis(500)),
StreamOffset.create(STREAM, ReadOffset.lastConsumed())
);

if (records == null || records.isEmpty()) {
return null;
}
MapRecord<String, Object, Object> msg = redisClient.readWithConsumerGroup(Duration.ofMillis(500));
//redisTemplate.opsForStream().acknowledge(STREAM, GROUP, msg.getId());

MapRecord<String, Object, Object> msg = records.get(0);
redisTemplate.opsForStream().acknowledge(STREAM, GROUP, msg.getId());
if(msg == null || msg.getValue().isEmpty()) return null;

Map<String, String> data = new HashMap<>();
Object subscriptionId = msg.getValue().get("subscriptionId");
Expand All @@ -65,7 +54,6 @@ public Map<String, String> read() throws InterruptedException {

//long end = System.currentTimeMillis();
//log.info("[3. Queue에서 꺼내기] {}ms", end - start);

return data;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.example.cs25batch.batch.component.writer;

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.batch.dto.MailDto;
import com.example.cs25batch.sender.context.MailSenderContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Slf4j
Expand All @@ -17,7 +16,7 @@
public class MailWriter implements ItemWriter<MailDto> {

private final MailSenderContext mailSenderContext;
private final StringRedisTemplate redisTemplate;
private final RedisStreamsClient streamsClient;

@Value("${mail.strategy:javaBatchMailSender}")
private String strategyKey;
Expand All @@ -34,17 +33,8 @@ public void write(Chunk<? extends MailDto> items) throws Exception {
// 에러 로깅 또는 알림 처리
System.err.println("메일 발송 실패: " + e.getMessage());
} finally {
deleteStreamRecord(mail.getRecordId());
streamsClient.ackAndDel(mail.getRecordId());
}
}
}

private void deleteStreamRecord(String recordIdStr){
try {
RecordId recordId = RecordId.of(recordIdStr);
redisTemplate.opsForStream().delete("quiz-email-stream", recordId);
} catch (Exception e) {
log.warn("Redis 스트림 레코드 삭제 실패: recordId = {}, error = {}", recordIdStr, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.example.cs25batch.config;

import com.example.cs25batch.adapter.RedisStreamsClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;

@Configuration
public class RedisStreamsConfig {
@Bean
public RedisStreamsClient quizEmailStreamsClient(StringRedisTemplate redisTemplate) {
return new RedisStreamsClient(
redisTemplate,
"quiz-email-stream", // stream 이름
"mail-consumer-group", // group
"mail-worker" // consumer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import com.example.cs25batch.batch.service.JavaMailService;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

Expand All @@ -17,7 +21,7 @@ public class JavaMailSenderStrategy implements MailSenderStrategy{
.addLimit(
Bandwidth.builder()
.capacity(4)
.refillGreedy(2, Duration.ofMillis(500))
.refillGreedy(4, Duration.ofMillis(1000))
.build()
)
.build();
Expand All @@ -28,7 +32,21 @@ public void sendQuizMail(MailDto mailDto) {
}

@Override
public Bucket getBucket() {
return bucket;
public boolean tryConsume(Long num){
return bucket.tryConsume(num);
}

@Override
public void acquirePermitOrWait(){
while (true) {
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;

long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.example.cs25batch.sender;

import com.example.cs25batch.batch.dto.MailDto;
import io.github.bucket4j.Bucket;

public interface MailSenderStrategy {
void sendQuizMail(MailDto mailDto);

Bucket getBucket();
boolean tryConsume(Long num);

void acquirePermitOrWait();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import com.example.cs25batch.batch.service.SesMailService;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

Expand All @@ -18,7 +22,7 @@ public class SesMailSenderStrategy implements MailSenderStrategy{
.addLimit(
Bandwidth.builder()
.capacity(14)
.refillGreedy(7, Duration.ofMillis(500))
.refillGreedy(14, Duration.ofMillis(1000))
.build()
)
.build();
Expand All @@ -29,7 +33,21 @@ public void sendQuizMail(MailDto mailDto) {
}

@Override
public Bucket getBucket() {
return bucket;
public boolean tryConsume(Long num){
return bucket.tryConsume(num);
}

@Override
public void acquirePermitOrWait(){
while (true) {
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;

long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
Comment on lines +41 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

acquirePermitOrWait에서도 인터럽트 신호 처리 추가 권장

JavaMail 전략과 동일한 이유로 인터럽트 플래그를 확인하여 루프를 탈출하세요.

     @Override
     public void acquirePermitOrWait(){
         while (true) {
+            if (Thread.currentThread().isInterrupted()) {
+                return;
+            }
             ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
             if (probe.isConsumed()) return;
 
             long nanos = probe.getNanosToWaitForRefill();
             long jitter = TimeUnit.MILLISECONDS.toNanos(
                 ThreadLocalRandom.current().nextInt(0, 50)
             );
             LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void acquirePermitOrWait(){
while (true) {
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;
long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
@Override
public void acquirePermitOrWait(){
while (true) {
if (Thread.currentThread().isInterrupted()) {
return;
}
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;
long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
}
🤖 Prompt for AI Agents
In
cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java
around lines 41 to 51, the acquirePermitOrWait loop does not handle thread
interruptions; add an interrupt check inside the loop (e.g., if
(Thread.currentThread().isInterrupted()) { Thread.currentThread().interrupt();
return; }) so the method exits promptly when the thread is interrupted,
preserving the interrupt flag and preventing an endless wait.

}
}
Loading