diff --git a/cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java b/cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java index 5e69d282..981624eb 100644 --- a/cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java +++ b/cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java @@ -2,22 +2,14 @@ import com.example.cs25batch.adapter.RedisStreamsClient; import com.example.cs25batch.sender.context.MailSenderContext; -import io.github.bucket4j.Bucket; import java.time.Duration; import java.util.HashMap; -import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.ItemReader; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.connection.stream.StreamReadOptions; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @Slf4j @@ -35,9 +27,12 @@ public class RedisStreamReader implements ItemReader> { public Map read() throws InterruptedException { //long start = System.currentTimeMillis(); +/* while (!mailSenderContext.tryConsume(strategyKey, 1L)) { Thread.sleep(200); //토큰을 얻을 때까지 간격을 두고 재시도 } +*/ + mailSenderContext.acquirePermitOrWait(strategyKey); MapRecord msg = redisClient.readWithConsumerGroup(Duration.ofMillis(500)); //redisTemplate.opsForStream().acknowledge(STREAM, GROUP, msg.getId()); diff --git a/cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java b/cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java index f949df1f..8fc3039e 100644 --- a/cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java +++ b/cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java @@ -4,6 +4,10 @@ import com.example.cs25batch.batch.service.JavaMailService; import io.github.bucket4j.Bandwidth; import io.github.bucket4j.Bucket; +import io.github.bucket4j.ConsumptionProbe; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -17,7 +21,7 @@ public class JavaMailSenderStrategy implements MailSenderStrategy{ .addLimit( Bandwidth.builder() .capacity(4) - .refillGreedy(4, Duration.ofMillis(1000)) + .refillGreedy(2, Duration.ofMillis(500)) .build() ) .build(); @@ -31,4 +35,18 @@ public void sendQuizMail(MailDto mailDto) { public boolean tryConsume(Long num){ return bucket.tryConsume(num); } + + @Override + public void acquirePermitOrWait(){ + while (true) { + ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1); + if (probe.isConsumed()) return; + + long nanos = probe.getNanosToWaitForRefill(); + long jitter = TimeUnit.MILLISECONDS.toNanos( + ThreadLocalRandom.current().nextInt(0, 50) + ); + LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1))); + } + } } diff --git a/cs25-batch/src/main/java/com/example/cs25batch/sender/MailSenderStrategy.java b/cs25-batch/src/main/java/com/example/cs25batch/sender/MailSenderStrategy.java index 2171e5d5..ecac20a3 100644 --- a/cs25-batch/src/main/java/com/example/cs25batch/sender/MailSenderStrategy.java +++ b/cs25-batch/src/main/java/com/example/cs25batch/sender/MailSenderStrategy.java @@ -1,9 +1,12 @@ package com.example.cs25batch.sender; import com.example.cs25batch.batch.dto.MailDto; +import io.github.bucket4j.Bucket; public interface MailSenderStrategy { void sendQuizMail(MailDto mailDto); boolean tryConsume(Long num); + + void acquirePermitOrWait(); } diff --git a/cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java b/cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java index 8d4318ad..34be43ae 100644 --- a/cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java +++ b/cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java @@ -4,6 +4,10 @@ import com.example.cs25batch.batch.service.SesMailService; import io.github.bucket4j.Bandwidth; import io.github.bucket4j.Bucket; +import io.github.bucket4j.ConsumptionProbe; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -32,4 +36,18 @@ public void sendQuizMail(MailDto mailDto) { public boolean tryConsume(Long num){ return bucket.tryConsume(num); } + + @Override + public void acquirePermitOrWait(){ + while (true) { + ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1); + if (probe.isConsumed()) return; + + long nanos = probe.getNanosToWaitForRefill(); + long jitter = TimeUnit.MILLISECONDS.toNanos( + ThreadLocalRandom.current().nextInt(0, 50) + ); + LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1))); + } + } } diff --git a/cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java b/cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java index 6b30dc6e..dc1e3382 100644 --- a/cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java +++ b/cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java @@ -30,4 +30,8 @@ private MailSenderStrategy getValidStrategy(String strategyKey) { return strategy; } + public void acquirePermitOrWait(String strategyKey) { + MailSenderStrategy strategy = getValidStrategy(strategyKey); + strategy.acquirePermitOrWait(); + } }