Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,14 @@

import com.example.cs25batch.adapter.RedisStreamsClient;
import com.example.cs25batch.sender.context.MailSenderContext;
import io.github.bucket4j.Bucket;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Slf4j
Expand All @@ -35,9 +27,12 @@ public class RedisStreamReader implements ItemReader<Map<String, String>> {
public Map<String, String> read() throws InterruptedException {
//long start = System.currentTimeMillis();

/*
while (!mailSenderContext.tryConsume(strategyKey, 1L)) {
Thread.sleep(200); //토큰을 얻을 때까지 간격을 두고 재시도
}
*/
mailSenderContext.acquirePermitOrWait(strategyKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Reader에서 토큰을 선점하면 ‘메시지 없음’에도 토큰이 소모됩니다 — 토큰 획득 호출 제거 권장(Writer로 이동)

현재 acquirePermitOrWait는 토큰을 실제로 소비합니다(tryConsumeAndReturnRemaining 사용). 메시지가 없거나 빈 레코드일 때도 처리량을 깎는 문제가 생깁니다. 토큰 획득은 MailWriter에서 실제 발송 직전에 수행하세요.

-        mailSenderContext.acquirePermitOrWait(strategyKey);
+        // 토큰 획득은 MailWriter에서 실제 발송 직전에 수행

MailWriter 측 반영은 별도 코멘트의 diff를 참고하세요.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
mailSenderContext.acquirePermitOrWait(strategyKey);
// 토큰 획득은 MailWriter에서 실제 발송 직전에 수행
🤖 Prompt for AI Agents
In
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java
around line 41, remove the call to
mailSenderContext.acquirePermitOrWait(strategyKey) from the reader because it
consumes a token even when there are no messages (tryConsumeAndReturnRemaining
behavior); instead, ensure token acquisition is performed in MailWriter
immediately before sending each mail (per the provided MailWriter diff). Update
the reader to not call or check permits at all, and confirm MailWriter calls
acquirePermitOrWait(strategyKey) right before the actual send so tokens are only
consumed for real send attempts.


MapRecord<String, Object, Object> msg = redisClient.readWithConsumerGroup(Duration.ofMillis(500));
//redisTemplate.opsForStream().acknowledge(STREAM, GROUP, msg.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import com.example.cs25batch.batch.service.JavaMailService;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

Expand All @@ -17,7 +21,7 @@ public class JavaMailSenderStrategy implements MailSenderStrategy{
.addLimit(
Bandwidth.builder()
.capacity(4)
.refillGreedy(4, Duration.ofMillis(1000))
.refillGreedy(2, Duration.ofMillis(500))
.build()
)
.build();
Expand All @@ -31,4 +35,18 @@ public void sendQuizMail(MailDto mailDto) {
public boolean tryConsume(Long num){
return bucket.tryConsume(num);
}

@Override
public void acquirePermitOrWait(){
while (true) {
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;

long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.example.cs25batch.sender;

import com.example.cs25batch.batch.dto.MailDto;
import io.github.bucket4j.Bucket;

public interface MailSenderStrategy {
void sendQuizMail(MailDto mailDto);

boolean tryConsume(Long num);

void acquirePermitOrWait();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import com.example.cs25batch.batch.service.SesMailService;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -32,4 +36,18 @@ public void sendQuizMail(MailDto mailDto) {
public boolean tryConsume(Long num){
return bucket.tryConsume(num);
}

@Override
public void acquirePermitOrWait(){
while (true) {
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;

long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
Comment on lines +41 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

acquirePermitOrWait에서도 인터럽트 신호 처리 추가 권장

JavaMail 전략과 동일한 이유로 인터럽트 플래그를 확인하여 루프를 탈출하세요.

     @Override
     public void acquirePermitOrWait(){
         while (true) {
+            if (Thread.currentThread().isInterrupted()) {
+                return;
+            }
             ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
             if (probe.isConsumed()) return;
 
             long nanos = probe.getNanosToWaitForRefill();
             long jitter = TimeUnit.MILLISECONDS.toNanos(
                 ThreadLocalRandom.current().nextInt(0, 50)
             );
             LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void acquirePermitOrWait(){
while (true) {
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;
long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
@Override
public void acquirePermitOrWait(){
while (true) {
if (Thread.currentThread().isInterrupted()) {
return;
}
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;
long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
}
🤖 Prompt for AI Agents
In
cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java
around lines 41 to 51, the acquirePermitOrWait loop does not handle thread
interruptions; add an interrupt check inside the loop (e.g., if
(Thread.currentThread().isInterrupted()) { Thread.currentThread().interrupt();
return; }) so the method exits promptly when the thread is interrupted,
preserving the interrupt flag and preventing an endless wait.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ private MailSenderStrategy getValidStrategy(String strategyKey) {
return strategy;
}

public void acquirePermitOrWait(String strategyKey) {
MailSenderStrategy strategy = getValidStrategy(strategyKey);
strategy.acquirePermitOrWait();
}
Comment on lines +33 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

토큰 획득 시점은 Writer(실제 발송 직전) 배치가 적합합니다

Reader 단계에서 토큰을 확보하면 메시지가 없을 때도 토큰이 소모되어 실제 처리량이 줄 수 있습니다. MailWriter.send 직전에서 acquirePermitOrWait를 호출하도록 표준화하세요(Reader에서 호출 제거 권장).

🤖 Prompt for AI Agents
In
cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java
around lines 33 to 36, the call to acquirePermitOrWait is placed in the
Reader-phase; move token acquisition to the Writer-phase by removing any
Reader-side calls and call MailSenderContext.acquirePermitOrWait(strategyKey)
immediately before the actual send in MailWriter.send. Ensure MailWriter obtains
the MailSenderStrategy via getValidStrategy(strategyKey) (or calls
acquirePermitOrWait on MailSenderContext) right before dispatch, and delete the
Reader invocation to avoid consuming tokens for unread messages. Preserve
existing exception handling and semantics when acquirePermitOrWait blocks or
throws.

}