Conversation
…o refactor/338
…o refactor/338
…o refactor/338
WalkthroughRedisStreamReader가 Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Reader as RedisStreamReader
participant Ctx as MailSenderContext
participant Strat as MailSenderStrategy (impl)
participant Redis as RedisStreamsClient
Reader->>Ctx: acquirePermitOrWait(strategyKey)
Ctx->>Strat: acquirePermitOrWait()
Note right of Strat #fffbdd: 루프 - ConsumptionProbe로 대기시간 계산\n지터를 더해 parkNanos로 대기
Strat-->>Ctx: permit acquired
Ctx-->>Reader: ok
Reader->>Redis: readWithConsumerGroup(Duration)
Redis-->>Reader: msg (recordId, valueMap)
alt msg null or empty
Reader-->>Reader: return null
else valid
Reader-->>Reader: extract subscriptionId, recordId, valueMap
Reader-->>Reader: return Map
end
sequenceDiagram
autonumber
actor Caller as Mail flow
participant Strat as MailSenderStrategy impl
Caller->>Strat: acquirePermitOrWait()
loop until token available
Strat->>Strat: tryConsumeAndReturnRemaining(1) (ConsumptionProbe)
alt no token
Strat-->>Strat: compute waitNanos from probe + jitter
Strat-->>Strat: LockSupport.parkNanos(wait)
else token ok
Strat-->>Caller: proceed
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related PRs
Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
cs25-batch/src/main/java/com/example/cs25batch/aop/MailLogAspect.java (1)
51-72: finally 블록 예외가 원인 예외를 덮어쓸 위험
finally내부의 DB 저장/Redis 전송 중 예외가 발생하면, 본래의 메일 발송 예외를 덮어쓰고 상위로 전파될 수 있습니다. 로깅 후 삼키거나(또는 서킷 브레이크/리트라이) 원인 예외를 우선시하도록 방어가 필요합니다.적용 예시(diff):
} finally { - MailLog mailLog = MailLog.builder() - .subscription(subscription) - .quiz(quiz) - .sendDate(LocalDateTime.now()) - .status(status) - .caused(caused) - .build(); - - mailLogRepository.save(mailLog); - mailLogRepository.flush(); - - if (status == MailStatus.FAILED) { - log.info("메일 발송 실패 : subscriptionId - {}, cause - {}", subscription.getId(), caused); - Map<String, String> retryMessage = Map.of( - "email", subscription.getEmail(), - "subscriptionId", subscription.getId().toString(), - "quizId", quiz.getId().toString() - ); - redisClient.addDlq("quiz-email-retry-stream", retryMessage); - } + try { + MailLog mailLog = MailLog.builder() + .subscription(subscription) + .quiz(quiz) + .sendDate(LocalDateTime.now()) + .status(status) + .caused(caused) + .build(); + mailLogRepository.save(mailLog); + mailLogRepository.flush(); + } catch (Exception logEx) { + log.error("메일 로그 저장 실패 - subscriptionId: {}", + subscription != null ? subscription.getId() : null, logEx); + } + if (status == MailStatus.FAILED) { + try { + log.warn("메일 발송 실패: subscriptionId={}, cause={}", subscription.getId(), caused); + Map<String, String> retryMessage = Map.of( + "email", subscription.getEmail(), + "subscriptionId", subscription.getId().toString(), + "quizId", quiz.getId().toString(), + "recordId", String.valueOf(mailDto.getRecordId()) + ); + redisClient.addDlq("quiz-email-retry-stream", retryMessage); + } catch (Exception dlqEx) { + log.error("메일 실패 DLQ 추가 실패 - subscriptionId: {}", subscription.getId(), dlqEx); + } + } }cs25-batch/src/main/java/com/example/cs25batch/batch/component/processor/MailConsumerAsyncProcessor.java (1)
38-41: 필터링(return null) 분기에서 레코드 미확인(ACK) 누락 → PEL 정체 가능성
!subscription.isActive()또는!subscription.isTodaySubscribed()일 때null을 반환하지만 ACK/DEL이 없어 PEL에 갇힐 수 있습니다. 재소비 루틴이 별도로 없다면 여기서도ackAndDel이 필요합니다.적용 예시(diff):
- if (!subscription.isActive() || !subscription.isTodaySubscribed()) { - return null; - } + if (!subscription.isActive() || !subscription.isTodaySubscribed()) { + log.info("구독 비활성/요일 불일치로 메시지 드랍 - subscriptionId={}, recordId={}", subscriptionId, recordId); + redisClient.ackAndDel(recordId); + return null; + }cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java (1)
32-58: 필터링된 항목(null 반환) 경로의 ACK/DEL 보장 필요(교차 파일 이슈)Processor가 구독 비활성/요일 불일치로 null을 반환하면 Writer가 호출되지 않아 ACK/DEL이 영구 누락될 수 있습니다. QuizException에는 ackAndDel이 있으나 다른 필터 조건에도 동일 정책 적용이 필요합니다(Processor에서 ackAndDel 수행 권장).
참고 수정 예시(파일: MailConsumerAsyncProcessor.java, 제공된 스니펫 기준):
// 구독 유효성 검증 직후 if (!subscription.isActive() || !subscription.isTodaySubscribed()) { redisClient.ackAndDel(recordId); // 필터링 시에도 ACK/DEL return null; }
🧹 Nitpick comments (18)
cs25-entity/src/main/java/com/example/cs25entity/domain/mail/enums/MailStatus.java (2)
5-6: 분기 폭발 방지: 상태(Status)와 실패 사유(Reason) 분리 고려업무별 실패 상태가
MailStatus에 지속적으로 추가되면 스위치/분기 복잡도가 커집니다. 상태는 최소화(SENT/FAILED 등)하고, 실패 사유는 별도MailFailureReason(예: QUIZ_VALIDATION_FAILED, RATE_LIMIT_EXCEEDED, TEMPLATE_ERROR 등)로 분리하는 설계를 권장합니다. 이는 확장성·가독성·클라이언트 호환성에 유리합니다.간단한 스케치:
public enum MailStatus { SENT, FAILED } public enum MailFailureReason { QUIZ_FAILED, RATE_LIMIT_EXCEEDED, TEMPLATE_ERROR, ... } // 엔티티 private MailStatus status; private MailFailureReason failureReason; // nullable
5-6: 테스트/문서 보강 제안
- 단위/통합 테스트: QUIZ_FAILED로 전이되는 시나리오(퀴즈 메일 처리 실패, DLQ 라우팅 등) 추가
- 계약 문서: OpenAPI/Swagger에
MailStatus가능한 값 업데이트 및 예시 응답 반영- 운영 관측: 대시보드/알람에 QUIZ_FAILED 카운트 추가
필요 시 테스트 스켈레톤/문서 패치 제안 드리겠습니다.
cs25-batch/src/main/java/com/example/cs25batch/sender/MailSenderStrategy.java (2)
8-8: tryConsume 시그니처: Long → long, num → permits로 명확화 제안Null 불필요(primitive로 충분) 및 의미 명확화를 위해 파라미터를
long permits로 바꾸는 편이 안전합니다. 구현체 전반에서 NPE 가능성을 제거하고 의도를 분명히 할 수 있습니다.적용 예시(diff):
- boolean tryConsume(Long num); + boolean tryConsume(long permits);
10-10: 차단 대기 메서드의 인터럽트/무한 대기 처리 계약 강화
acquirePermitOrWait()가 블로킹하는 API라면, 호출 스레드 인터럽트 시나리오 및 무한 대기 방지를 계약에 반영하는 것이 좋습니다. 최소한throws InterruptedException명시를 권장합니다. (또는 별도 오버로드로timeout지원)적용 예시(diff):
- void acquirePermitOrWait(); + void acquirePermitOrWait() throws InterruptedException;추가로 타임아웃 변형을 도입하고 싶다면(선택):
boolean acquirePermitOrWait(Duration timeout) throws InterruptedException- 구현체에서 false 반환 시 호출 측이 재시도/드롭 정책을 선택
cs25-batch/src/main/java/com/example/cs25batch/adapter/RedisStreamsClient.java (3)
25-35: 제네릭 타입을 String으로 구체화해 다운스트림 캐스팅 비용/실수를 방지
StringRedisTemplate을 사용하므로MapRecord<String, String, String>이 더 정확합니다. 현재Object는 불필요하게 넓습니다.적용 예시(diff):
- public MapRecord<String, Object, Object> readWithConsumerGroup(Duration blockTimeout) { + public MapRecord<String, String, String> readWithConsumerGroup(Duration blockTimeout) { @@ - List<MapRecord<String, Object, Object>> records = redisTemplate + List<MapRecord<String, String, String>> records = redisTemplate .opsForStream().read( Consumer.from(group, consumer), options, StreamOffset.create(stream, ReadOffset.lastConsumed()) );또한
blockTimeout이null일 경우 런타임 예외가 발생할 수 있으니 초기에 명시적 검증을 권장합니다(예:Objects.requireNonNull(blockTimeout, "blockTimeout")).
46-50: ack + del 네트워크 라운드 트립 최소화(파이프라이닝)
ack와del을 파이프라인으로 묶으면 왕복 횟수를 줄이고 처리량을 개선할 수 있습니다.적용 예시(diff):
public void ackAndDel(String recordId) { RecordId id = RecordId.of(recordId); - redisTemplate.opsForStream().acknowledge(stream, group, id); - redisTemplate.opsForStream().delete(stream, id); + redisTemplate.executePipelined(ops -> { + ops.opsForStream().acknowledge(stream, group, id); + ops.opsForStream().delete(stream, id); + return null; + }); }
52-54: DLQ 스트림 길이 무한 성장 방지 옵션 고려DLQ는 본질적으로 축적되므로
MAXLEN(근사 트림) 같은 트리밍 옵션을 붙여주는 편이 안전합니다. Spring Data Redis의XAddOptions.maxlen()을 사용해 길이 상한을 두는 것을 권장합니다(예: 100k).참고 구현(비-diff 스니펫):
import org.springframework.data.redis.connection.stream.Record; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.connection.stream.XAddOptions; public void addDlq(String dlqStream, Map<String, String> message){ Record<String, Map<String, String>> record = StreamRecords.newRecord() .in(dlqStream) .ofMap(message); redisTemplate.opsForStream().add(record, XAddOptions.maxlen(100_000).approximateTrimming()); }cs25-batch/src/main/java/com/example/cs25batch/aop/MailLogAspect.java (2)
47-51: 원인 예외 스택 손실: CustomMailException 래핑 시 cause 연결현재
catch (Exception e)에서 원인 스택트레이스를 잃습니다. 디버깅·모니터링을 위해 cause를 연결해 던지는 것이 좋습니다(가능 시).적용 예시(diff):
- } catch (Exception e){ + } catch (Exception e){ status = MailStatus.FAILED; caused = e.getMessage(); - throw new CustomMailException(MailExceptionCode.EMAIL_SEND_FAILED_ERROR); + throw new CustomMailException(MailExceptionCode.EMAIL_SEND_FAILED_ERROR, e); }만약 해당 생성자가 없다면, 예외 로깅에서
e를 함께 남기는 방식으로라도 스택을 보존하는 것을 권장합니다.
65-71: DLQ 페이로드에 recordId 포함 권장재처리/트레이싱을 위해 스트림의 원본
recordId를 DLQ 메시지에 포함하는 것이 유용합니다. 위의 diff에 반영했습니다.cs25-batch/src/main/java/com/example/cs25batch/batch/component/processor/MailConsumerAsyncProcessor.java (2)
43-54: 퀴즈 생성 실패 시 처리는 적절하나, 추가 로깅 권장
QuizException에서ackAndDel(recordId)로 드랍하는 결정은 명확합니다. 운영 가시성을 위해 실패 사유와 대상 식별자를 남겨두면 좋습니다.적용 예시(diff):
- } catch(QuizException e){ - //문제 출제 실패로 인한 예외 발생 시, 기존 Queue에 있는 데이터 삭제 - redisClient.ackAndDel(recordId); - return null; + } catch (QuizException e) { + // 문제 출제 실패: 기존 Queue 데이터 삭제 후 드랍 + log.warn("퀴즈 출제 실패로 메시지 드랍 - subscriptionId={}, recordId={}, reason={}", + subscriptionId, recordId, e.getMessage()); + redisClient.ackAndDel(recordId); + return null; }
27-29: 입력 메시지 유효성 검증 요청: subscriptionId/recordId 누락 시 처리
message.get("subscriptionId")혹은message.get("recordId")가 누락되면 NPE/IAE가 발생합니다. 생성 측에서 항상 주입된다면 괜찮지만, 계약을 명시하거나 사전에 검증해 방어 코드를 추가하는 것을 권장합니다.권장 스니펫(비-diff):
import java.util.Objects; String recordId = Objects.requireNonNull(message.get("recordId"), "recordId is required"); Long subscriptionId = Long.valueOf(Objects.requireNonNull(message.get("subscriptionId"), "subscriptionId is required"));cs25-batch/src/main/java/com/example/cs25batch/config/RedisStreamsConfig.java (1)
3-19: 하드코딩된 스트림 구성 외부화 및 그룹 보장 생성환경별로 다른 값을 쓸 수 있도록 프로퍼티로 외부화하고, 컨슈머 그룹이 없을 때 생성하는 초기화 로직을 추가하는 것을 권장합니다. 운영에서 그룹 미존재로 인한 런타임 실패를 줄일 수 있습니다.
적용 예시(diff):
import com.example.cs25batch.adapter.RedisStreamsClient; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.connection.stream.ReadOffset; @@ @Bean - public RedisStreamsClient quizEmailStreamsClient(StringRedisTemplate redisTemplate) { - return new RedisStreamsClient( - redisTemplate, - "quiz-email-stream", // stream 이름 - "mail-consumer-group", // group - "mail-worker" // consumer - ); + public RedisStreamsClient quizEmailStreamsClient( + StringRedisTemplate redisTemplate, + @Value("${streams.quiz-email.stream:quiz-email-stream}") String stream, + @Value("${streams.quiz-email.group:mail-consumer-group}") String group, + @Value("${streams.quiz-email.consumer:mail-worker}") String consumer + ) { + // 컨슈머 그룹 보장(이미 있으면 BUSYGROUP 예외 무시) + try { + redisTemplate.opsForStream().createGroup(stream, ReadOffset.latest(), group); + } catch (Exception ignore) { /* no-op */ } + + return new RedisStreamsClient(redisTemplate, stream, group, consumer); }추가로 DLQ 스트림도 동일한 방식으로 프로퍼티화 및 그룹을 보장(필요 시)하는 구성을 고려해 주세요.
cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java (2)
35-37: tryConsume 인자의 유효성 방어 코드 추가 권장Long가 null/비양수로 들어올 수 있습니다. 최소 1 토큰을 소비하도록 방어하면 NPE/예상치 못한 동작을 예방할 수 있습니다(인터페이스가 Long라면 내부에서 보정).
- public boolean tryConsume(Long num){ - return bucket.tryConsume(num); - } + public boolean tryConsume(Long num){ + long n = (num == null || num <= 0) ? 1L : num; + return bucket.tryConsume(n); + }
16-28: 전략 간 중복 로직(버킷 설정/대기 루프) 공통화 제안Java/Ses 전략 모두 동일한 acquirePermitOrWait/방어 로직을 가집니다. 유틸(예: RateLimiterHelper)로 추출하면 정책 변경 시 일관성 유지가 쉬워집니다.
cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java (2)
36-38: tryConsume 입력 방어 추가 제안null/비양수 입력 방지용 보정 로직을 추가해주세요(JavaMail과 동일).
- public boolean tryConsume(Long num){ - return bucket.tryConsume(num); - } + public boolean tryConsume(Long num){ + long n = (num == null || num <= 0) ? 1L : num; + return bucket.tryConsume(n); + }
16-28: 구성 중복 공통화 제안두 전략의 버킷 설정/대기 로직은 동일 패턴입니다. 공통 헬퍼로 추출하여 일관성 유지 및 테스트 용이성을 높일 수 있습니다.
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java (2)
36-40: 주석 처리된 레거시 토큰 루프는 제거하는 편이 가독성에 유리합니다히스토리 관리가 Git으로 가능하므로 주석 코드는 과감히 삭제해주세요.
-/* - while (!mailSenderContext.tryConsume(strategyKey, 1L)) { - Thread.sleep(200); //토큰을 얻을 때까지 간격을 두고 재시도 - } -*/
5-7: 사용되지 않는 import 정리 권장Bucket/ConsumptionProbe/LockSupport 관련 import는 본 클래스에서 사용하지 않습니다. 불필요 import를 제거하면 경고 감소 및 가독성 향상에 도움이 됩니다.
-import io.github.bucket4j.Bucket; -import io.github.bucket4j.ConsumptionProbe; +// (미사용) bucket4j 관련 import 제거 @@ -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; +// (미사용) 대기 관련 import 제거Also applies to: 11-13
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (11)
cs25-batch/src/main/java/com/example/cs25batch/adapter/RedisStreamsClient.java(1 hunks)cs25-batch/src/main/java/com/example/cs25batch/aop/MailLogAspect.java(3 hunks)cs25-batch/src/main/java/com/example/cs25batch/batch/component/processor/MailConsumerAsyncProcessor.java(3 hunks)cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java(1 hunks)cs25-batch/src/main/java/com/example/cs25batch/batch/component/writer/MailWriter.java(3 hunks)cs25-batch/src/main/java/com/example/cs25batch/config/RedisStreamsConfig.java(1 hunks)cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java(3 hunks)cs25-batch/src/main/java/com/example/cs25batch/sender/MailSenderStrategy.java(1 hunks)cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java(3 hunks)cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java(2 hunks)cs25-entity/src/main/java/com/example/cs25entity/domain/mail/enums/MailStatus.java(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java (7)
cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java (1)
RequiredArgsConstructor(16-53)cs25-batch/src/main/java/com/example/cs25batch/adapter/RedisStreamsClient.java (1)
RequiredArgsConstructor(16-55)cs25-batch/src/main/java/com/example/cs25batch/batch/component/processor/MailConsumerAsyncProcessor.java (1)
Slf4j(16-58)cs25-batch/src/main/java/com/example/cs25batch/batch/component/writer/MailWriter.java (1)
Slf4j(13-40)cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java (1)
Component(16-52)cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java (1)
Component(10-37)cs25-batch/src/main/java/com/example/cs25batch/config/RedisConsumerGroupInitalizer.java (1)
Component(10-27)
🔇 Additional comments (9)
cs25-entity/src/main/java/com/example/cs25entity/domain/mail/enums/MailStatus.java (3)
5-6: 추가 자체는 합리적입니다퀴즈 메일 처리 파이프라인(DLQ 등)에서 일반 실패와 구분된 시그널이 필요한 요구를 충족합니다. 위의 호환성 점검만 완료되면 도입에 동의합니다.
5-6: QUIZ_FAILED 추가 영향 검증 결과
ORDINAL 매핑 사용 여부
@Enumerated(EnumType.STRING)만 사용 중이며,EnumType.ORDINAL은 전혀 사용되지 않습니다. 신규 값 추가로 인한 DB 순서 변경 이슈는 없습니다.switch 문
switch(…MailStatus…)구문은 코드베이스에 존재하지 않아,QUIZ_FAILED추가로 분기 로직을 수정할 곳이 없습니다.하드코딩된
MailStatus.FAILED분기
cs25-batch/src/main/java/com/example/cs25batch/aop/MailLogAspect.java
- 48행:
status = MailStatus.FAILED;- 63행:
if (status == MailStatus.FAILED) { … }
해당 로직은 메일 전송 실패 처리 전용이므로,QUIZ_FAILED와는 무관합니다. 별도 수정 불필요합니다.직렬화/역직렬화
@Enumerated(EnumType.STRING)로 문자열 매핑 중이므로, Jackson이 새 enum 값을 자동으로 처리합니다.
다만, 프론트엔드·모바일·외부 파트너 쪽 클라이언트는 새 값(QUIZ_FAILED) 수용 여부 확인이 필요합니다.API 스펙 및 문서
OpenAPI 스펙이나 YAML/문서에MailStatusenum 목록이 별도 정의된 부분이 보이지 않습니다.
– 컨트롤러 또는 스펙 파일(.yaml/.yml)이 있다면QUIZ_FAILED반영을 검토해주세요.테스트 커버리지
MailLogServiceTest에서는MailStatus.SENT만 검증 중입니다.
–QUIZ_FAILED시나리오 테스트 추가를 권장합니다.요청 사항
- 프론트·모바일·외부 파트너가 새 enum 값을 수용할 수 있도록 클라이언트 및 API 문서(또는 OpenAPI 스펙) 검토·반영
QUIZ_FAILED관련 비즈니스 로직(예: 퀴즈 실패 시 로그 기록 또는 후속 처리) 위치가 별도 있다면, 해당 코드에서 새 enum 값이 올바르게 처리되는지 확인- 필요한 경우
QUIZ_FAILED시나리오를 위한 단위/통합 테스트 추가
5-6: ORDINAL 매핑 위험 없음 — 이미 STRING 매핑 사용 중입니다
MailLog엔티티의status필드에 이미@Enumerated(EnumType.STRING)이 적용되어 있습니다 (cs25-entity/src/main/java/com/example/cs25entity/domain/mail/entity/MailLog.java:42).User,Quiz엔티티에서도 모든@Enumerated매핑이EnumType.STRING으로 일관되게 설정되어 있습니다.따라서
EnumType.ORDINAL전환에 따른 데이터 오염 우려는 해당 코드베이스에 적용되지 않으며, 본 리뷰 코멘트는 무시하셔도 좋습니다.Likely an incorrect or invalid review comment.
cs25-batch/src/main/java/com/example/cs25batch/adapter/RedisStreamsClient.java (1)
27-33: 소비 오프셋 의미 확인: lastConsumed(= ‘>’)로 신규 메시지만 읽음
ReadOffset.lastConsumed()는 컨슈머 그룹에서 신규(미전달) 메시지만 소비합니다(내부적으로XREADGROUP ... '>'). PEL 재처리 요구사항(예: 장애 후 미확인 메시지 클레임)이 있다면 별도의 ‘pending’/‘claim’ 경로가 필요합니다. 운영 의도와 일치하는지 확인 바랍니다.cs25-batch/src/main/java/com/example/cs25batch/batch/component/writer/MailWriter.java (2)
3-3: Redis Streams 접근을 전용 클라이언트로 추상화한 방향, 좋습니다삭제/ACK 로직이 한 곳으로 모여 테스트성과 변경 용이성이 올라갑니다. MailWriter에서의 책임도 명확해졌습니다.
18-20: MailWriter의 RedisStreamsClient 주입에는 현재 모호성이 없습니다스크립트 실행 결과
RedisStreamsConfig에서 정의된RedisStreamsClient빈은 오직quizEmailStreamsClient하나뿐이므로, 타입 기반 주입 시 유일하게 매칭되어 모호성이 발생하지 않습니다.
따라서 현 시점에서는@Qualifier를 명시적으로 추가할 필요가 없습니다.
– 앞으로 동일 타입의 빈을 추가 등록할 계획이 있다면, 충돌 방지를 위해 해당 시점에@Qualifier("quizEmailStreamsClient")등을 도입하는 것을 고려하시기 바랍니다.cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java (1)
24-24: 리필 주기 변경(500ms→1000ms 동량 토큰)은 버스트 특성에 영향을 줍니다 — 의도 확인 요청초당 처리량은 동일(4/s)이지만, 1초 단위 리필은 짧은 버스트에 불리/유리할 수 있습니다. 운영 트래픽 패턴과 맞는지 확인 부탁드립니다. 필요 시 capacity를 늘리고 consume 단위 조정 등으로 스파이크 수용력을 조절할 수 있습니다.
cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java (1)
20-23: 전략 위임을 통한 토큰 소비 시그널로의 전환, 방향성 좋습니다Bucket 노출 제거로 컨텍스트의 응집도가 좋아졌고, 전략별 정책 캡슐화가 명확해졌습니다.
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java (1)
43-47: null/empty 메시지에 대해 null 반환 시 Step 조기 종료 가능성 — 잡 구성 의도 확인 요청Spring Batch의 ItemReader.read()가 null을 반환하면 일반적으로 입력 종료로 간주되어 Step이 끝납니다. 스트리밍 소비가 의도라면 null 대신 재시도 루프(백오프 포함)로 전환하거나, 잡/스텝 구성이 주기성 실행임을 명확히 해야 합니다. 의도 확인 부탁드립니다.
선택지(참고용, 팀 컨벤션에 맞춰 조정):
- MapRecord<String, Object, Object> msg = redisClient.readWithConsumerGroup(Duration.ofMillis(500)); - //redisTemplate.opsForStream().acknowledge(STREAM, GROUP, msg.getId()); - - if(msg == null || msg.getValue().isEmpty()) return null; + MapRecord<String, Object, Object> msg; + do { + msg = redisClient.readWithConsumerGroup(Duration.ofMillis(500)); + if (Thread.currentThread().isInterrupted()) return null; // 취소 신호 존중 + } while (msg == null || msg.getValue().isEmpty());
| Thread.sleep(200); //토큰을 얻을 때까지 간격을 두고 재시도 | ||
| } | ||
| */ | ||
| mailSenderContext.acquirePermitOrWait(strategyKey); |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Reader에서 토큰을 선점하면 ‘메시지 없음’에도 토큰이 소모됩니다 — 토큰 획득 호출 제거 권장(Writer로 이동)
현재 acquirePermitOrWait는 토큰을 실제로 소비합니다(tryConsumeAndReturnRemaining 사용). 메시지가 없거나 빈 레코드일 때도 처리량을 깎는 문제가 생깁니다. 토큰 획득은 MailWriter에서 실제 발송 직전에 수행하세요.
- mailSenderContext.acquirePermitOrWait(strategyKey);
+ // 토큰 획득은 MailWriter에서 실제 발송 직전에 수행MailWriter 측 반영은 별도 코멘트의 diff를 참고하세요.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| mailSenderContext.acquirePermitOrWait(strategyKey); | |
| // 토큰 획득은 MailWriter에서 실제 발송 직전에 수행 |
🤖 Prompt for AI Agents
In
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java
around line 41, remove the call to
mailSenderContext.acquirePermitOrWait(strategyKey) from the reader because it
consumes a token even when there are no messages (tryConsumeAndReturnRemaining
behavior); instead, ensure token acquisition is performed in MailWriter
immediately before sending each mail (per the provided MailWriter diff). Update
the reader to not call or check permits at all, and confirm MailWriter calls
acquirePermitOrWait(strategyKey) right before the actual send so tokens are only
consumed for real send attempts.
cs25-batch/src/main/java/com/example/cs25batch/batch/component/writer/MailWriter.java
Show resolved
Hide resolved
| public void acquirePermitOrWait(String strategyKey) { | ||
| MailSenderStrategy strategy = getValidStrategy(strategyKey); | ||
| strategy.acquirePermitOrWait(); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
토큰 획득 시점은 Writer(실제 발송 직전) 배치가 적합합니다
Reader 단계에서 토큰을 확보하면 메시지가 없을 때도 토큰이 소모되어 실제 처리량이 줄 수 있습니다. MailWriter.send 직전에서 acquirePermitOrWait를 호출하도록 표준화하세요(Reader에서 호출 제거 권장).
🤖 Prompt for AI Agents
In
cs25-batch/src/main/java/com/example/cs25batch/sender/context/MailSenderContext.java
around lines 33 to 36, the call to acquirePermitOrWait is placed in the
Reader-phase; move token acquisition to the Writer-phase by removing any
Reader-side calls and call MailSenderContext.acquirePermitOrWait(strategyKey)
immediately before the actual send in MailWriter.send. Ensure MailWriter obtains
the MailSenderStrategy via getValidStrategy(strategyKey) (or calls
acquirePermitOrWait on MailSenderContext) right before dispatch, and delete the
Reader invocation to avoid consuming tokens for unread messages. Preserve
existing exception handling and semantics when acquirePermitOrWait blocks or
throws.
cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java
Show resolved
Hide resolved
| public void acquirePermitOrWait(){ | ||
| while (true) { | ||
| ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1); | ||
| if (probe.isConsumed()) return; | ||
|
|
||
| long nanos = probe.getNanosToWaitForRefill(); | ||
| long jitter = TimeUnit.MILLISECONDS.toNanos( | ||
| ThreadLocalRandom.current().nextInt(0, 50) | ||
| ); | ||
| LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1))); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
acquirePermitOrWait에서도 인터럽트 신호 처리 추가 권장
JavaMail 전략과 동일한 이유로 인터럽트 플래그를 확인하여 루프를 탈출하세요.
@Override
public void acquirePermitOrWait(){
while (true) {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) return;
long nanos = probe.getNanosToWaitForRefill();
long jitter = TimeUnit.MILLISECONDS.toNanos(
ThreadLocalRandom.current().nextInt(0, 50)
);
LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1)));
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public void acquirePermitOrWait(){ | |
| while (true) { | |
| ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1); | |
| if (probe.isConsumed()) return; | |
| long nanos = probe.getNanosToWaitForRefill(); | |
| long jitter = TimeUnit.MILLISECONDS.toNanos( | |
| ThreadLocalRandom.current().nextInt(0, 50) | |
| ); | |
| LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1))); | |
| } | |
| @Override | |
| public void acquirePermitOrWait(){ | |
| while (true) { | |
| if (Thread.currentThread().isInterrupted()) { | |
| return; | |
| } | |
| ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1); | |
| if (probe.isConsumed()) return; | |
| long nanos = probe.getNanosToWaitForRefill(); | |
| long jitter = TimeUnit.MILLISECONDS.toNanos( | |
| ThreadLocalRandom.current().nextInt(0, 50) | |
| ); | |
| LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1))); | |
| } | |
| } |
🤖 Prompt for AI Agents
In
cs25-batch/src/main/java/com/example/cs25batch/sender/SesMailSenderStrategy.java
around lines 41 to 51, the acquirePermitOrWait loop does not handle thread
interruptions; add an interrupt check inside the loop (e.g., if
(Thread.currentThread().isInterrupted()) { Thread.currentThread().interrupt();
return; }) so the method exits promptly when the thread is interrupted,
preserving the interrupt flag and preventing an endless wait.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java (1)
37-41: 성공 처리 시 ACK 호출 누락 — 메일 발송 완료 후 Redis Stream ACK 추가 필요현재
MailConsumerAsyncProcessor.java에서 Redis Stream 메시지 ACK가 예외 처리(QuizException 발생) 블록 내의redisClient.ackAndDel(recordId)호출로만 이루어지고 있어, 정상적으로 메일 발송이 완료된 경우에는 ACK가 전혀 호출되지 않습니다. 이로 인해 성공 처리된 메시지는 PEL에 남아 재처리 또는 클레임 이슈를 일으킬 수 있습니다.조치 사항:
- MailConsumerAsyncProcessor.java (cs25-batch/src/main/java/com/example/cs25batch/batch/component/processor/MailConsumerAsyncProcessor.java)
- try 블록 내 메일 발송 로직 직후에
redisClient.ack(recordId)호출 추가
→ 성공 시 해당 레코드를 ACK하여 PEL에서 제거하도록 보장- catch 블록에서의
ackAndDel호출이 실패 메시지의 삭제 용도라면 유지하되, 의도치 않은 데이터 손실이 없도록 로직 검토
♻️ Duplicate comments (2)
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java (1)
35-35: Reader에서 토큰 선점하면 ‘메시지 없음’에도 토큰이 소모됩니다 — 토큰 획득은 Writer(실제 발송 직전)로 이동 권장현재
mailSenderContext.acquirePermitOrWait(strategyKey)를 Reader 단계에서 호출하여 메시지를 읽기 전 토큰을 소비합니다. 메시지가 없거나 빈 레코드(null/empty)인 경우에도 토큰이 깎여 처리량이 왜곡됩니다. 토큰 선점은 실제 외부 발송 직전에만 수행되도록 MailWriter로 이동하세요. 최소 완화책으로는 “메시지를 읽은 뒤 null/empty가 아님을 확인한 다음” 토큰을 획득하는 순서로 바꾸되, 이 방식은 메시지를 홀딩한 채 대기 시간을 가질 수 있어 권장 1안은 여전히 Writer로 이동입니다.적용 예(Reader에서 호출 제거):
- mailSenderContext.acquirePermitOrWait(strategyKey);Writer 측(예: 실제
send직전)에서:mailSenderContext.acquirePermitOrWait(strategyKey); mailSender.send(...);cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java (1)
41-51: 인터럽트 무시로 배치 취소가 지연될 수 있음 — 인터럽트 플래그를 존중하세요현재 무한 루프에서 스레드 인터럽트를 확인하지 않아 Step 취소 시에도 대기가 지속될 수 있습니다. 루프 입구와
parkNanos직후에 인터럽트를 확인해 즉시 반환하도록 수정하세요.적용 예:
@Override public void acquirePermitOrWait(){ - while (true) { + while (true) { + if (Thread.currentThread().isInterrupted()) { + return; // 취소/중단 신호 존중 + } ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1); if (probe.isConsumed()) return; long nanos = probe.getNanosToWaitForRefill(); long jitter = TimeUnit.MILLISECONDS.toNanos( ThreadLocalRandom.current().nextInt(0, 50) ); LockSupport.parkNanos(Math.min(nanos + jitter, TimeUnit.SECONDS.toNanos(1))); + if (Thread.currentThread().isInterrupted()) { + return; // 대기 중 인터럽트 발생 시 즉시 탈출 + } } }
🧹 Nitpick comments (2)
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java (1)
27-27: 불필요한throws InterruptedException제거 권장
read()는 현재 내부에서 체크 예외를 던지지 않습니다(대기는LockSupport.parkNanos계열로 처리). 시그니처의throws InterruptedException를 제거해 호출부 불필요 예외 처리를 줄이는 것이 깔끔합니다. 만약 인터럽트를 존중하려면 내부에서Thread.currentThread().isInterrupted()체크 후 조용히return null하는 패턴을 고려하세요.적용 예:
- public Map<String, String> read() throws InterruptedException { + public Map<String, String> read() {cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java (1)
20-27: 레이트 한도 하드코딩 제거 제안 — 환경별 조정 가능하도록 외부화
capacity(4)/refillGreedy(2, 500ms)는 하드코딩되어 환경별 튜닝이 어렵습니다. 속성 외부화(@Value또는 설정 프로퍼티 바인딩)로 운영/스테이징/개발 환경에 맞춰 쉽게 조정 가능하도록 하는 것을 권장합니다.예시(파일 외부 변경 포함):
@Value("${rate.java.capacity:4}") private int capacity; @Value("${rate.java.refillTokens:2}") private int refillTokens; @Value("${rate.java.refillMs:500}") private long refillMs; // ... private final Bucket bucket = Bucket.builder() .addLimit(Bandwidth.builder() .capacity(capacity) .refillGreedy(refillTokens, Duration.ofMillis(refillMs)) .build()) .build();
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
cs25-batch/src/main/java/com/example/cs25batch/batch/component/reader/RedisStreamReader.java(1 hunks)cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java(3 hunks)cs25-batch/src/main/java/com/example/cs25batch/sender/MailSenderStrategy.java(1 hunks)
🔇 Additional comments (2)
cs25-batch/src/main/java/com/example/cs25batch/sender/JavaMailSenderStrategy.java (2)
34-37: 현재 인터페이스 정의와 @OverRide 불일치 가능 — 인터페이스 수정 전까지 컴파일 상태 확인 필요현 시점 인터페이스가
tryConsume를 선언하지 않으면 이@Override는 컴파일 오류가 됩니다. 앞선 인터페이스 수정안을 적용하면 해결됩니다. 적용 전이라면 임시로@Override제거가 필요할 수 있습니다.
34-37: 이중 소비(Dual consume) 가능성 수동 검증 요청
- 자동화 스크립트(
rg)로는 동일 컨텍스트에서tryConsume직후acquirePermitOrWait호출 패턴이 탐지되지 않았습니다.
하지만 오탐 및 누락 가능성이 있으므로, 실제 상위 컨텍스트(MailSenderContext등)에서 아래 사항을 코드와 테스트로 명확히 보장해 주세요:
•tryConsume실패 시에만acquirePermitOrWait호출
• 호출 규칙을 검증하는 단위/통합 테스트 추가- 메서드 시그니처 최적화: 불필요한 오토박싱 제거를 위해
Long→long변경 권장부분 시그니처 개선 예시:
- public boolean tryConsume(Long num){ + public boolean tryConsume(long num){ return bucket.tryConsume(num); }
cs25-batch/src/main/java/com/example/cs25batch/sender/MailSenderStrategy.java
Show resolved
Hide resolved
…o refactor/338
🔎 작업 내용
closed #338
Summary by CodeRabbit