diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..85aa7133 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +.gradle +target +out +node_modules +*.iml +*.log diff --git a/.gitignore b/.gitignore index af714193..67a764d1 100644 --- a/.gitignore +++ b/.gitignore @@ -64,4 +64,9 @@ grafana-data prometheus-data docker-compose.yml prometheus.yml +redis-data/dump.rdb .my.cnf +src/main/resources/application-stage.yml +k6/k6_result.json +k6/settlement_test.js +ngrinder-controller \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 3e1ec5e6..4e03829d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,9 @@ -# 사용할 base 이미지 선택 -FROM openjdk:21 +FROM eclipse-temurin:21-jre +WORKDIR /app -# build/libs/ 에 있는 jar 파일을 JAR_FILE 변수에 저장 -ARG JAR_FILE=build/libs/*.jar - -# JAR_FILE을 app.jar로 복사 -COPY ${JAR_FILE} app.jar +# 실행 가능한 fat jar만 복사 +COPY build/libs/onlyone-0.0.1-SNAPSHOT.jar app.jar EXPOSE 8080 -ENTRYPOINT ["java", "-jar", "-Duser.timezone=Asia/Seoul", "app.jar"] \ No newline at end of file +ENTRYPOINT ["java","--enable-preview","-jar","app.jar"] + diff --git a/build.gradle b/build.gradle index d096cb6d..75b55dd8 100644 --- a/build.gradle +++ b/build.gradle @@ -38,38 +38,50 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-logging' implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.7.0' + + // Lombok (컴파일 + 테스트 모두 지원) compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + testCompileOnly 'org.projectlombok:lombok' + testAnnotationProcessor 'org.projectlombok:lombok' + developmentOnly 'org.springframework.boot:spring-boot-devtools' runtimeOnly 'com.mysql:mysql-connector-j' runtimeOnly 'com.h2database:h2' - annotationProcessor 'org.projectlombok:lombok' + testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.security:spring-security-test' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testImplementation 'com.h2database:h2' + + // Jwt implementation 'io.jsonwebtoken:jjwt-api:0.11.5' runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.11.5' runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.11.5' - // Jwt implementation group: 'com.auth0', name: 'java-jwt', version: '3.14.0' - // Oauth2 implementation 'org.springframework.boot:spring-boot-starter-oauth2-client' implementation 'io.jsonwebtoken:jjwt-api:0.12.4' runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.12.4' runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.12.4' + // OpenFeign implementation 'org.springframework.cloud:spring-cloud-starter-openfeign' implementation 'io.github.openfeign:feign-jackson' - //websocket + + // websocket, actuator implementation 'org.springframework.boot:spring-boot-starter-websocket' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter' - //fcm + + // fcm implementation 'com.google.firebase:firebase-admin:9.5.0' + // S3 implementation 'software.amazon.awssdk:s3:2.32.11' + // Redis implementation 'org.springframework.boot:spring-boot-starter-data-redis' + // Elasticsearch implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch' // Spring Retry @@ -77,6 +89,7 @@ dependencies { implementation 'org.springframework:spring-aspects' testImplementation 'org.testcontainers:junit-jupiter' testImplementation 'org.testcontainers:testcontainers' + // jpa 쿼리 확인 implementation 'com.github.gavlyukovskiy:p6spy-spring-boot-starter:1.9.0' @@ -89,6 +102,9 @@ dependencies { // 모니터링 implementation 'org.springframework.boot:spring-boot-starter-actuator' runtimeOnly 'io.micrometer:micrometer-registry-prometheus' + + // kafka + implementation 'org.springframework.kafka:spring-kafka' } // QueryDSL 설정 (수정됨) @@ -96,6 +112,7 @@ def querydslDir = layout.buildDirectory.dir('generated/querydsl') tasks.withType(JavaCompile).configureEach { options.generatedSourceOutputDirectory = querydslDir + options.compilerArgs += ["--enable-preview"] // ✅ StructuredTaskScope 활성화 } sourceSets { @@ -110,10 +127,11 @@ clean { delete querydslDir } -// 나머지는 기존과 동일 +// test task preview 옵션 추가 tasks.named('test') { useJUnitPlatform() finalizedBy jacocoTestReport + jvmArgs += "--enable-preview" // ✅ test 시 preview 활성화 // notification 관련 테스트 임시 제외 exclude '**/AppNotificationControllerTest.class' @@ -121,6 +139,11 @@ tasks.named('test') { exclude '**/AppNotificationTypeRepositoryTest.class' } +// bootRun도 preview 활성화 +tasks.named("bootRun") { + jvmArgs("--enable-preview") +} + jacoco { toolVersion = "0.8.13" } diff --git a/src/main/java/com/example/onlyone/domain/payment/service/PaymentService.java b/src/main/java/com/example/onlyone/domain/payment/service/PaymentService.java index 0119bbb6..6abc9fe3 100644 --- a/src/main/java/com/example/onlyone/domain/payment/service/PaymentService.java +++ b/src/main/java/com/example/onlyone/domain/payment/service/PaymentService.java @@ -98,7 +98,7 @@ public ConfirmTossPayResponse confirm(ConfirmTossPayRequest req) { Wallet wallet = walletRepository.findByUser(user) .orElseThrow(() -> new CustomException(ErrorCode.WALLET_NOT_FOUND)); - int amount = Math.toIntExact(req.getAmount()); + Long amount = req.getAmount(); wallet.updateBalance(wallet.getPostedBalance() + amount); WalletTransaction walletTransaction = payment.getWalletTransaction(); @@ -195,7 +195,7 @@ public void reportFail(ConfirmTossPayRequest req) { WalletTransaction failTx = WalletTransaction.builder() .type(Type.CHARGE) - .amount(Math.toIntExact(req.getAmount())) + .amount(req.getAmount()) .balance(wallet.getPostedBalance()) .walletTransactionStatus(WalletTransactionStatus.FAILED) .wallet(wallet) diff --git a/src/main/java/com/example/onlyone/domain/schedule/dto/request/ScheduleRequestDto.java b/src/main/java/com/example/onlyone/domain/schedule/dto/request/ScheduleRequestDto.java index 42848c09..57cec400 100644 --- a/src/main/java/com/example/onlyone/domain/schedule/dto/request/ScheduleRequestDto.java +++ b/src/main/java/com/example/onlyone/domain/schedule/dto/request/ScheduleRequestDto.java @@ -19,7 +19,7 @@ public class ScheduleRequestDto { private String location; @NotNull @Min(value = 0, message = "정기 모임 금액은 0원 이상이어야 합니다.") - private int cost; + private Long cost; @NotNull @Min(value = 1, message = "정기 모임 정원은 1명 이상이어야 합니다.") @Max(value = 100, message = "정기 모임 정원은 100명 이하여야 합니다.") diff --git a/src/main/java/com/example/onlyone/domain/schedule/dto/response/ScheduleDetailResponseDto.java b/src/main/java/com/example/onlyone/domain/schedule/dto/response/ScheduleDetailResponseDto.java index 781d625e..2d7b1357 100644 --- a/src/main/java/com/example/onlyone/domain/schedule/dto/response/ScheduleDetailResponseDto.java +++ b/src/main/java/com/example/onlyone/domain/schedule/dto/response/ScheduleDetailResponseDto.java @@ -15,7 +15,7 @@ public class ScheduleDetailResponseDto { private Long scheduleId; private String name; private LocalDateTime scheduleTime; - private int cost; + private Long cost; private int userLimit; private String location; diff --git a/src/main/java/com/example/onlyone/domain/schedule/dto/response/ScheduleResponseDto.java b/src/main/java/com/example/onlyone/domain/schedule/dto/response/ScheduleResponseDto.java index 3e1c3368..b0e8375a 100644 --- a/src/main/java/com/example/onlyone/domain/schedule/dto/response/ScheduleResponseDto.java +++ b/src/main/java/com/example/onlyone/domain/schedule/dto/response/ScheduleResponseDto.java @@ -16,7 +16,7 @@ public class ScheduleResponseDto { private String name; private ScheduleStatus scheduleStatus; private LocalDateTime scheduleTime; - private int cost; + private Long cost; private int userLimit; private int userCount; private boolean isJoined; diff --git a/src/main/java/com/example/onlyone/domain/schedule/entity/Schedule.java b/src/main/java/com/example/onlyone/domain/schedule/entity/Schedule.java index 2a45393b..2ddb1432 100644 --- a/src/main/java/com/example/onlyone/domain/schedule/entity/Schedule.java +++ b/src/main/java/com/example/onlyone/domain/schedule/entity/Schedule.java @@ -38,7 +38,7 @@ public class Schedule extends BaseTimeEntity { @Column(name = "cost") @NotNull - private int cost; + private Long cost; @Column(name = "user_limit") @NotNull @@ -60,7 +60,7 @@ public class Schedule extends BaseTimeEntity { @OneToOne(mappedBy = "schedule", cascade = CascadeType.ALL, orphanRemoval = true) private Settlement settlement; - public void update(String name, String location, int cost, int userLimit, LocalDateTime scheduleTime) { + public void update(String name, String location, Long cost, int userLimit, LocalDateTime scheduleTime) { this.name = name; this.location = location; this.cost = cost; diff --git a/src/main/java/com/example/onlyone/domain/schedule/service/ScheduleService.java b/src/main/java/com/example/onlyone/domain/schedule/service/ScheduleService.java index 77a03b3e..65fafcef 100644 --- a/src/main/java/com/example/onlyone/domain/schedule/service/ScheduleService.java +++ b/src/main/java/com/example/onlyone/domain/schedule/service/ScheduleService.java @@ -111,7 +111,7 @@ public ScheduleCreateResponseDto createSchedule(Long clubId, ScheduleRequestDto userChatRoomRepository.save(userChatRoom); Settlement settlement = Settlement.builder() .schedule(schedule) - .sum(0) // 정산 시작 시 참여자 수 * COST + .sum(0L) // 정산 시작 시 참여자 수 * COST .totalStatus(TotalStatus.HOLDING) .receiver(user) // 리더가 receiver .build(); @@ -140,7 +140,7 @@ public void updateSchedule(Long clubId, Long scheduleId, ScheduleRequestDto requ // 정산 금액이 변경되는 경우 if (schedule.getCost() != requestDto.getCost()) { int memberCount = userScheduleRepository.countBySchedule(schedule) - 1; - int delta = requestDto.getCost() - schedule.getCost(); + Long delta = requestDto.getCost() - schedule.getCost(); if (memberCount > 0 && delta != 0) { List targets = @@ -256,7 +256,7 @@ public void leaveSchedule(Long clubId, Long scheduleId) { return; } - final int amount = schedule.getCost(); + final Long amount = schedule.getCost(); int flag = walletRepository.releaseHoldBalance(user.getUserId(), amount); if (flag == 0) throw new CustomException(ErrorCode.WALLET_HOLD_STATE_CONFLICT); diff --git a/src/main/java/com/example/onlyone/domain/settlement/dto/event/OutboxEvent.java b/src/main/java/com/example/onlyone/domain/settlement/dto/event/OutboxEvent.java new file mode 100644 index 00000000..a2f8f3e9 --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/dto/event/OutboxEvent.java @@ -0,0 +1,34 @@ +package com.example.onlyone.domain.settlement.dto.event; + +import com.example.onlyone.domain.settlement.entity.OutboxStatus; +import jakarta.persistence.*; +import lombok.*; +import java.time.LocalDateTime; + +@Getter @Setter +@Entity @Table(name = "outbox_event", indexes = { + @Index(name = "idx_outbox_new", columnList = "status,id") + }) +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class OutboxEvent { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + private String aggregateType; // "UserSettlement" + private Long aggregateId; // userSettlementId + private String eventType; // "ParticipantSettlementResult" + private String keyString; // partition key (e.g., memberWalletId) + + @Lob + private String payload; // JSON + + @Enumerated(EnumType.STRING) + private OutboxStatus status; + + private LocalDateTime createdAt; + private LocalDateTime publishedAt; +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/dto/event/SettlementProcessEvent.java b/src/main/java/com/example/onlyone/domain/settlement/dto/event/SettlementProcessEvent.java new file mode 100644 index 00000000..d0d65a13 --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/dto/event/SettlementProcessEvent.java @@ -0,0 +1,19 @@ +package com.example.onlyone.domain.settlement.dto.event; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.util.List; + +@Data +@AllArgsConstructor +public class SettlementProcessEvent { + private final Long settlementId; + private final Long scheduleId; + private final Long clubId; + private final Long leaderId; + private final Long leaderWalletId; + private final Long costPerUser; + private final Long totalAmount; + private final List targetUserIds; +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/dto/event/UserSettlementStatusEvent.java b/src/main/java/com/example/onlyone/domain/settlement/dto/event/UserSettlementStatusEvent.java new file mode 100644 index 00000000..6e6420fc --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/dto/event/UserSettlementStatusEvent.java @@ -0,0 +1,34 @@ +package com.example.onlyone.domain.settlement.dto.event; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.time.Instant; + +@Data +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class UserSettlementStatusEvent { + public enum ResultType { SUCCESS, FAILED } + + private ResultType type; // "SUCCESS" | "FAILED" + private String operationId; // "stl:4:usr:100234:v1" + private Instant occurredAt; + + private long settlementId; + private long userSettlementId; + private long participantId; + + private long memberWalletId; + private long leaderId; + private long leaderWalletId; + private long amount; + + + @Data + public static class Snapshots { + private Long memberPostedBalance; + private Long leaderPostedBalance; + } +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/dto/event/WalletCaptureFailedEvent.java b/src/main/java/com/example/onlyone/domain/settlement/dto/event/WalletCaptureFailedEvent.java new file mode 100644 index 00000000..c187d066 --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/dto/event/WalletCaptureFailedEvent.java @@ -0,0 +1,15 @@ +package com.example.onlyone.domain.settlement.dto.event; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class WalletCaptureFailedEvent { + private Long userSettlementId; + private Long memberWalletId; + private Long leaderWalletId; + private Long amount; + private Long memberBalanceBefore; + private Long leaderBalanceBefore; +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/dto/event/WalletCaptureSucceededEvent.java b/src/main/java/com/example/onlyone/domain/settlement/dto/event/WalletCaptureSucceededEvent.java new file mode 100644 index 00000000..30c1f840 --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/dto/event/WalletCaptureSucceededEvent.java @@ -0,0 +1,15 @@ +package com.example.onlyone.domain.settlement.dto.event; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class WalletCaptureSucceededEvent { + private Long userSettlementId; + private Long memberWalletId; + private Long leaderWalletId; + private Long amount; + private Long memberBalanceAfter; + private Long leaderBalanceAfter; +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/entity/OutboxStatus.java b/src/main/java/com/example/onlyone/domain/settlement/entity/OutboxStatus.java new file mode 100644 index 00000000..7649bc0a --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/entity/OutboxStatus.java @@ -0,0 +1,7 @@ +package com.example.onlyone.domain.settlement.entity; + +public enum OutboxStatus { + NEW, + PUBLISHED, + FAILED +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/entity/Settlement.java b/src/main/java/com/example/onlyone/domain/settlement/entity/Settlement.java index 8daa8f97..a00fc7ed 100644 --- a/src/main/java/com/example/onlyone/domain/settlement/entity/Settlement.java +++ b/src/main/java/com/example/onlyone/domain/settlement/entity/Settlement.java @@ -32,7 +32,7 @@ public class Settlement extends BaseTimeEntity { @Column(name = "sum") @NotNull - private int sum; + private Long sum; @Column(name = "total_status") @NotNull @@ -56,7 +56,7 @@ public void update(TotalStatus totalStatus, LocalDateTime completedTime) { this.completedTime = completedTime; } - public void updateSum(int sum) { + public void updateSum(Long sum) { this.sum = sum; } diff --git a/src/main/java/com/example/onlyone/domain/settlement/repository/OutboxRepository.java b/src/main/java/com/example/onlyone/domain/settlement/repository/OutboxRepository.java new file mode 100644 index 00000000..42315c8f --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/repository/OutboxRepository.java @@ -0,0 +1,22 @@ +package com.example.onlyone.domain.settlement.repository; + +import com.example.onlyone.domain.settlement.dto.event.OutboxEvent; +import org.springframework.data.jpa.repository.*; +import org.springframework.data.repository.query.Param; +import org.springframework.stereotype.Repository; + +import java.util.List; + +@Repository +public interface OutboxRepository extends JpaRepository { + + @Query(value = """ + SELECT * FROM outbox_event + WHERE status = 'NEW' + ORDER BY id ASC + LIMIT :limit + FOR UPDATE SKIP LOCKED + """, nativeQuery = true) + List pickNewForUpdateSkipLocked(@Param("limit") int limit); + +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/repository/SettlementRepository.java b/src/main/java/com/example/onlyone/domain/settlement/repository/SettlementRepository.java index b6443be7..d577b2a9 100644 --- a/src/main/java/com/example/onlyone/domain/settlement/repository/SettlementRepository.java +++ b/src/main/java/com/example/onlyone/domain/settlement/repository/SettlementRepository.java @@ -8,6 +8,7 @@ import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import java.time.LocalDateTime; import java.util.List; import java.util.Optional; @@ -24,6 +25,14 @@ AND total_status in ('HOLDING', 'FAILED') """, nativeQuery = true) int markProcessing(@Param("id") Long settlementId); + @Modifying(clearAutomatically = true, flushAutomatically = true) + @Query("UPDATE Settlement s " + + "SET s.totalStatus = :status, s.completedTime = :time " + + "WHERE s.settlementId = :id AND s.totalStatus <> 'COMPLETED'") + int markCompleted(@Param("id") Long id, + @Param("status") TotalStatus status, + @Param("time") LocalDateTime time); + @Modifying(clearAutomatically = true, flushAutomatically = true) @Query("delete from Settlement s where s.schedule.scheduleId = :scheduleId") void deleteByScheduleId(@Param("scheduleId") Long scheduleId); diff --git a/src/main/java/com/example/onlyone/domain/settlement/repository/UserSettlementRepository.java b/src/main/java/com/example/onlyone/domain/settlement/repository/UserSettlementRepository.java index 4c78d9d4..b5b44c2c 100644 --- a/src/main/java/com/example/onlyone/domain/settlement/repository/UserSettlementRepository.java +++ b/src/main/java/com/example/onlyone/domain/settlement/repository/UserSettlementRepository.java @@ -15,7 +15,6 @@ import org.springframework.data.repository.query.Param; import java.util.List; -import java.util.Map; import java.util.Optional; public interface UserSettlementRepository extends JpaRepository { @@ -117,7 +116,30 @@ Optional findByUserAndSchedule( List findAllBySettlement_SettlementIdAndSettlementStatus( Long settlementId, SettlementStatus settlementStatus); + @Query(""" + SELECT us.user.userId + FROM UserSettlement us + WHERE us.settlement.settlementId = :settlementId + AND us.settlementStatus = :settlementStatus +""") + List findAllUserSettlementIdsBySettlementIdAndStatus( + @Param("settlementId") Long settlementId, + @Param("settlementStatus") SettlementStatus settlementStatus); + + // 성능 개선: 참가자 수와 ID 목록을 한 번에 조회하는 메서드 (사용하지 않음 - 위의 메서드로 대체) + @Query(""" + SELECT us.user.userId + FROM UserSettlement us + WHERE us.settlement.settlementId = :settlementId + AND us.settlementStatus = :settlementStatus +""") + List findActiveParticipantIds(@Param("settlementId") Long settlementId, + @Param("settlementStatus") SettlementStatus settlementStatus); + + @Modifying(clearAutomatically = true, flushAutomatically = true) @Query("delete from UserSettlement us where us.settlement.settlementId = :settlementId") void deleteAllBySettlementId(@Param("settlementId") Long settlementId); + + Optional findBySettlement_SettlementIdAndUser_UserId(Long settlementId, Long participantId); } diff --git a/src/main/java/com/example/onlyone/domain/settlement/service/FailedEventAppender.java b/src/main/java/com/example/onlyone/domain/settlement/service/FailedEventAppender.java new file mode 100644 index 00000000..a92a8c6f --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/service/FailedEventAppender.java @@ -0,0 +1,66 @@ +package com.example.onlyone.domain.settlement.service; + +import com.example.onlyone.domain.settlement.dto.event.OutboxEvent; +import com.example.onlyone.domain.settlement.dto.event.UserSettlementStatusEvent; +import com.example.onlyone.domain.settlement.entity.OutboxStatus; +import com.example.onlyone.domain.settlement.repository.OutboxRepository; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.time.Instant; +import java.time.LocalDateTime; + +@Service +@RequiredArgsConstructor +public class FailedEventAppender { + + private final OutboxRepository outboxRepository; + private final ObjectMapper objectMapper; + + // 실패한 UserSettlement 이벤트를 Outbox에 REQUIRES_NEW 트랜잭션으로 기록 + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void appendFailedUserSettlementEvent(Long settlementId, + Long userSettlementId, + Long participantId, + Long memberWalletId, + Long leaderId, + Long leaderWalletId, + Long amount) { + try { + // 1. DTO로 변환 + UserSettlementStatusEvent eventDto = new UserSettlementStatusEvent( + UserSettlementStatusEvent.ResultType.FAILED, + "stl:%d:usr:%d:v1".formatted(settlementId, participantId), + Instant.now(), + settlementId, + userSettlementId, + participantId, + memberWalletId, + leaderId, + leaderWalletId, + amount + ); + + // 2. JSON 직렬화 + String json = objectMapper.writeValueAsString(eventDto); + + // 3. OutboxEvent 저장 + OutboxEvent event = OutboxEvent.builder() + .aggregateType("UserSettlement") + .aggregateId(userSettlementId) + .eventType("ParticipantSettlementResult") + .keyString(String.valueOf(memberWalletId)) // partition key + .payload(json) + .status(OutboxStatus.NEW) + .createdAt(LocalDateTime.now()) + .build(); + + outboxRepository.save(event); + } catch (Exception e) { + throw new RuntimeException("Failed to append FAILED event to Outbox", e); + } + } +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/service/KafkaService.java b/src/main/java/com/example/onlyone/domain/settlement/service/KafkaService.java new file mode 100644 index 00000000..44ade44e --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/service/KafkaService.java @@ -0,0 +1,41 @@ +package com.example.onlyone.domain.settlement.service; + +import com.example.onlyone.global.config.kafka.KafkaProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Log4j2 +@Service +@RequiredArgsConstructor +public class KafkaService { + + private final LedgerWriter ledgerWriter; + private final KafkaProperties props; + + // KafkaListener: Kafka 메시지를 받는 entry point + // 메시지는 List> 배치(batch) 형태 + // user-settlement.result.v1 구독 + @KafkaListener( + groupId = "ledger-writer", + containerFactory = "userSettlementLedgerKafkaListenerContainerFactory", + topics = "#{@kafkaProperties.consumer.userSettlementLedgerConsumerConfig.topic}", + concurrency = "8" + ) + public void onUserSettlementResultBatch(List> records, Acknowledgment ack) { + try { + ledgerWriter.writeBatch(records); + // 오프셋 커밋 + ack.acknowledge(); + } catch (Exception e) { + // throw해서 컨테이너 재시도 + throw e; + } + } +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/service/LedgerWriter.java b/src/main/java/com/example/onlyone/domain/settlement/service/LedgerWriter.java new file mode 100644 index 00000000..1a2d9aaa --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/service/LedgerWriter.java @@ -0,0 +1,191 @@ +package com.example.onlyone.domain.settlement.service; + +import com.example.onlyone.domain.settlement.entity.UserSettlement; +import com.example.onlyone.domain.settlement.repository.TransferRepository; +import com.example.onlyone.domain.settlement.repository.UserSettlementRepository; +import com.example.onlyone.domain.wallet.entity.Transfer; +import com.example.onlyone.domain.wallet.entity.Wallet; +import com.example.onlyone.domain.wallet.entity.WalletTransaction; +import com.example.onlyone.domain.wallet.entity.WalletTransactionStatus; +import com.example.onlyone.domain.wallet.repository.WalletRepository; +import com.example.onlyone.domain.wallet.repository.WalletTransactionRepository; +import com.example.onlyone.domain.wallet.entity.Type; +import com.example.onlyone.global.exception.CustomException; +import com.example.onlyone.global.exception.ErrorCode; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.*; +import java.util.stream.Collectors; + +@Log4j2 +@Component +@RequiredArgsConstructor +// LedgerWriter: user-settlement.result.v1 토픽만 구독 +public class LedgerWriter { + + private final ObjectMapper objectMapper; + private final WalletTransactionRepository walletTransactionRepository; + private final TransferRepository transferRepository; + private final WalletRepository walletRepository; + private final UserSettlementRepository userSettlementRepository; + + /* + public class ConsumerRecord { + private final String topic; // 토픽명 + private final int partition; // 파티션 번호 + private final long offset; // 해당 파티션 내 오프셋 + private final K key; // Kafka 메시지 Key + private final V value; // Kafka 메시지 Value (JSON String) + private final long timestamp; // 메시지 발생/전송 시간 + ... + } + */ + + @Transactional + public void writeBatch(List> records) { + if (records == null || records.isEmpty()) { + return; + } + + List walletTransactionList = new ArrayList<>(); + List transferList = new ArrayList<>(); + + // 1) 파싱 및 candidate operationId 수집 + List events = records.stream() + .map(r -> parse(r.value())) + .toList(); + + Set candidateoperationIds = new HashSet<>(); + for (JsonNode root : events) { + String operationId = root.path("operationId").asText(); + if (operationId == null || operationId.isBlank()) continue; + candidateoperationIds.add(operationId + ":OUT"); + candidateoperationIds.add(operationId + ":IN"); + } + + // 2) 이미 처리된 operationId 조회 + Set existing = new HashSet<>(walletTransactionRepository.findExistingOperationIds(candidateoperationIds)); + + // 3) WalletTransaction / Transfer 생성 + Map walletTransactionHashMap = new HashMap<>(); + for (JsonNode root : events) { + String type = root.path("type").asText("SUCCESS"); + String operationId = root.path("operationId").asText(); + if (operationId == null || operationId.isBlank()) continue; + + long userSettlementId = root.path("userSettlementId").asLong(); + long memberWalletId = root.path("memberWalletId").asLong(); + long leaderWalletId = root.path("leaderWalletId").asLong(); + long amount = root.path("amount").asLong(); + + Wallet memberWallet = walletRepository.getReferenceById(memberWalletId); + Wallet leaderWallet = walletRepository.getReferenceById(leaderWalletId); + UserSettlement us = userSettlementRepository.getReferenceById(userSettlementId); + + WalletTransactionStatus status = + type.equals("SUCCESS") ? WalletTransactionStatus.COMPLETED : WalletTransactionStatus.FAILED; + + // OUTGOING + String outId = operationId + ":OUT"; + if (!existing.contains(outId)) { + WalletTransaction outTransaction = WalletTransaction.builder() + .operationId(outId) + .type(Type.OUTGOING) + .wallet(memberWallet) + .targetWallet(leaderWallet) + .amount(amount) + .balance(memberWallet.getPostedBalance()) + .walletTransactionStatus(status) + .build(); + walletTransactionList.add(outTransaction); + walletTransactionHashMap.put(outId, outTransaction); + + Transfer outTransfer = Transfer.builder() + .userSettlement(us) + .walletTransaction(outTransaction) + .build(); + transferList.add(outTransfer); + outTransaction.updateTransfer(outTransfer); + } + // INCOMING + String inId = operationId + ":IN"; + if (!existing.contains(inId)) { + WalletTransaction inTransaction = WalletTransaction.builder() + .operationId(inId) + .type(Type.INCOMING) + .wallet(leaderWallet) + .targetWallet(memberWallet) + .amount(amount) + .balance(leaderWallet.getPostedBalance()) + .walletTransactionStatus(status) + .build(); + walletTransactionList.add(inTransaction); + walletTransactionHashMap.put(inId, inTransaction); + + Transfer inTransfer = Transfer.builder() + .userSettlement(us) + .walletTransaction(inTransaction) + .build(); + transferList.add(inTransfer); + inTransaction.updateTransfer(inTransfer); + } + } + + // 4) WalletTransaction 저장 (배치 + 충돌 시 개별 재시도) + if (!walletTransactionList.isEmpty()) { + try { + walletTransactionRepository.saveAll(walletTransactionList); + walletTransactionRepository.flush(); + } catch (DataIntegrityViolationException dup) { + insertIndividuallyIgnoringDuplicate(walletTransactionList); + } + } + + // 5) Transfer 저장 (성능 개선: 배치 크기 제한) + if (!transferList.isEmpty()) { + try { + // 배치 크기를 1000으로 제한하여 메모리 사용량 최적화 + int batchSize = 1000; + for (int i = 0; i < transferList.size(); i += batchSize) { + int endIndex = Math.min(i + batchSize, transferList.size()); + List batch = transferList.subList(i, endIndex); + transferRepository.saveAll(batch); + } + transferRepository.flush(); + } catch (DataIntegrityViolationException dup) { + // 필요시 개별 재시도 + } + } + } + + private JsonNode parse(String s) { + try { return objectMapper.readTree(s); } + catch (Exception e) { + throw new CustomException(ErrorCode.INVALID_EVENT_PAYLOAD); + } + } + + private void insertIndividuallyIgnoringDuplicate(List walletTransactionList) { + Set existing = new HashSet<>( + walletTransactionRepository.findExistingOperationIds( + walletTransactionList.stream().map(WalletTransaction::getOperationId).collect(Collectors.toSet()) + ) + ); + for (WalletTransaction walletTransaction : walletTransactionList) { + if (existing.contains(walletTransaction.getOperationId())) continue; + try { + walletTransactionRepository.saveAndFlush(walletTransaction); + } catch (DataIntegrityViolationException ignored) { + // 동시경합으로 중복키면 그냥 스킵 + } + } + } +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/service/OutboxAppender.java b/src/main/java/com/example/onlyone/domain/settlement/service/OutboxAppender.java new file mode 100644 index 00000000..6bc74df6 --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/service/OutboxAppender.java @@ -0,0 +1,39 @@ +package com.example.onlyone.domain.settlement.service; + +import com.example.onlyone.domain.settlement.dto.event.OutboxEvent; +import com.example.onlyone.domain.settlement.entity.OutboxStatus; +import com.example.onlyone.domain.settlement.repository.OutboxRepository; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; + +@Service +@RequiredArgsConstructor +public class OutboxAppender { + + private final OutboxRepository outboxRepository; + private final ObjectMapper objectMapper; + + @Transactional + public void append(String aggregateType, Long aggregateId, + String eventType, String keyString, Object payloadObj) { + try { + String json = (payloadObj instanceof String s) ? s : objectMapper.writeValueAsString(payloadObj); + OutboxEvent e = OutboxEvent.builder() + .aggregateType(aggregateType) + .aggregateId(aggregateId) + .eventType(eventType) + .keyString(keyString) + .payload(json) + .status(OutboxStatus.NEW) + .createdAt(LocalDateTime.now()) + .build(); + outboxRepository.save(e); + } catch (Exception ex) { + throw new RuntimeException("Outbox append failed", ex); + } + } +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/service/SettlementKafkaEventListener.java b/src/main/java/com/example/onlyone/domain/settlement/service/SettlementKafkaEventListener.java new file mode 100644 index 00000000..c153211d --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/service/SettlementKafkaEventListener.java @@ -0,0 +1,203 @@ +package com.example.onlyone.domain.settlement.service; + +import com.example.onlyone.domain.schedule.entity.Schedule; +import com.example.onlyone.domain.schedule.entity.ScheduleStatus; +import com.example.onlyone.domain.schedule.repository.ScheduleRepository; +import com.example.onlyone.domain.settlement.dto.event.SettlementProcessEvent; +import com.example.onlyone.domain.settlement.entity.Settlement; +import com.example.onlyone.domain.settlement.entity.TotalStatus; +import com.example.onlyone.domain.settlement.repository.SettlementRepository; +import com.example.onlyone.domain.settlement.repository.UserSettlementRepository; +import com.example.onlyone.domain.user.repository.UserRepository; +import com.example.onlyone.domain.wallet.repository.WalletRepository; +import com.example.onlyone.domain.wallet.service.WalletService; +import com.example.onlyone.global.exception.CustomException; +import com.example.onlyone.global.exception.ErrorCode; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.StructuredTaskScope; +import java.util.concurrent.atomic.AtomicLong; + +@Component +@Slf4j +public class SettlementKafkaEventListener { + private final ObjectMapper objectMapper; + + // 백프레셔 제어를 위한 세마포어 + private final Semaphore concurrencyLimit; + + private final UserSettlementRepository userSettlementRepository; + private final UserSettlementService userSettlementService; + private final WalletRepository walletRepository; + private final WalletService walletService; + private final SettlementRepository settlementRepository; + private final ScheduleRepository scheduleRepository; + private final UserRepository userRepository; + @PersistenceContext + private EntityManager entityManager; + + // 생성자에서 세마포어 초기화 + public SettlementKafkaEventListener( + ObjectMapper objectMapper, + UserSettlementRepository userSettlementRepository, + UserSettlementService userSettlementService, + WalletRepository walletRepository, + WalletService walletService, + SettlementRepository settlementRepository, + ScheduleRepository scheduleRepository, + UserRepository userRepository, + @Value("${app.settlement.concurrency:32}") int concurrencyLimit + ) { + this.objectMapper = objectMapper; + this.userSettlementRepository = userSettlementRepository; + this.userSettlementService = userSettlementService; + this.walletRepository = walletRepository; + this.walletService = walletService; + this.settlementRepository = settlementRepository; + this.scheduleRepository = scheduleRepository; + this.userRepository = userRepository; + this.concurrencyLimit = new Semaphore(concurrencyLimit); + } + + // settlement.process.v1 토픽 구독 + @KafkaListener( + groupId = "settlement-orchestrator", + containerFactory = "settlementProcessKafkaListenerContainerFactory", + topics = "#{@kafkaProperties.producer.settlementProcessProducerConfig.topic}", + concurrency = "3" + ) + public void onSettlementProcess(List> records, Acknowledgment ack) { + try { + for (ConsumerRecord rec : records) { + SettlementProcessEvent event = parse(rec.value()); + processSettlementWithStructuredScope(event); + } + ack.acknowledge(); // 성공 시 배치 커밋 + } catch (Exception e) { + throw e; + } + } + + private SettlementProcessEvent parse(String json) { + try { + JsonNode root = objectMapper.readTree(json); + JsonNode payload = root.has("payload") ? root.get("payload") : root; + return objectMapper.treeToValue(payload, SettlementProcessEvent.class); + } catch (Exception e) { + throw new CustomException(ErrorCode.INVALID_EVENT_PAYLOAD); + } + } + + // StructuredTaskScope + Semaphore(백프레셔 제어용) + private void processSettlementWithStructuredScope(SettlementProcessEvent event) { + try (StructuredTaskScope.ShutdownOnFailure scope = + new StructuredTaskScope.ShutdownOnFailure("settlement-parallel", Thread.ofVirtual().factory())) { + + List targetUserIds = event.getTargetUserIds(); + AtomicLong totalProcessedAmount = new AtomicLong(0); + + // 각 참가자별로 가상 스레드 생성 + 세마포어 백프레셔 제어 + for (Long participantId : targetUserIds) { + scope.fork(() -> { + // 세마포어로 동시 실행 수 제한 + try { + concurrencyLimit.acquire(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return null; + } + + try { + return processParticipantWithRetry( + event.getSettlementId(), + event.getLeaderId(), + event.getLeaderWalletId(), + participantId, + event.getCostPerUser(), + totalProcessedAmount + ); + } finally { + concurrencyLimit.release(); + } + }); + } + scope.join(); + scope.throwIfFailed(); + + // 모든 참가자 완료 후 리더 크레딧 및 상태 업데이트 + completeSettlement(event, totalProcessedAmount.get()); + } catch (Exception e) { + throw new CustomException(ErrorCode.SETTLEMENT_PROCESS_FAILED); + } + } + + private Long processParticipantWithRetry(Long settlementId, Long leaderId, Long leaderWalletId, + Long participantId, Long costPerUser, AtomicLong totalAmount) { + int maxRetries = 3; + int retryDelay = 1000; + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + // 참가자별 개별 트랜잭션 처리 (REQUIRES_NEW + Redis Lua 게이트는 UserSettlementService 내부) + userSettlementService.processParticipantSettlement( + settlementId, + leaderId, + leaderWalletId, + participantId, + costPerUser + ); + // 처리된 금액을 원자적으로 누적 + totalAmount.addAndGet(costPerUser); + + return participantId; + + } catch (Exception e) { + if (attempt == maxRetries) { + throw new CustomException(ErrorCode.SETTLEMENT_PROCESS_FAILED); + } + try { + Thread.sleep(retryDelay * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new CustomException(ErrorCode.SETTLEMENT_PROCESS_FAILED); + } + } + } + return participantId; + } + + @Transactional + public void completeSettlement(SettlementProcessEvent event, long totalProcessedAmount) { + try { + // 리더에게 크레딧 + userSettlementService.creditToLeader(event.getLeaderId(), totalProcessedAmount); + + // 스케줄 상태 업데이트 + Schedule completedSchedule = scheduleRepository.findById(event.getScheduleId()) + .orElseThrow(() -> new CustomException(ErrorCode.SCHEDULE_NOT_FOUND)); + completedSchedule.updateStatus(ScheduleStatus.CLOSED); + scheduleRepository.save(completedSchedule); + + // 정산 상태 업데이트 + Settlement completedSettlement = settlementRepository.findById(event.getSettlementId()) + .orElseThrow(() -> new CustomException(ErrorCode.SETTLEMENT_NOT_FOUND)); + completedSettlement.update(TotalStatus.COMPLETED, LocalDateTime.now()); + settlementRepository.save(completedSettlement); + } catch (Exception e) { + throw e; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/example/onlyone/domain/settlement/service/SettlementProcessEventListener.java b/src/main/java/com/example/onlyone/domain/settlement/service/SettlementProcessEventListener.java new file mode 100644 index 00000000..e54b9660 --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/service/SettlementProcessEventListener.java @@ -0,0 +1,134 @@ +package com.example.onlyone.domain.settlement.service; + +import com.example.onlyone.domain.schedule.entity.Schedule; +import com.example.onlyone.domain.schedule.entity.ScheduleStatus; +import com.example.onlyone.domain.schedule.repository.ScheduleRepository; +import com.example.onlyone.domain.settlement.dto.event.SettlementProcessEvent; +import com.example.onlyone.domain.settlement.entity.Settlement; +import com.example.onlyone.domain.settlement.entity.TotalStatus; +import com.example.onlyone.domain.settlement.repository.SettlementRepository; +import com.example.onlyone.domain.settlement.repository.UserSettlementRepository; +import com.example.onlyone.domain.user.repository.UserRepository; +import com.example.onlyone.domain.wallet.repository.WalletRepository; +import com.example.onlyone.domain.wallet.service.WalletService; +import com.example.onlyone.global.exception.CustomException; +import com.example.onlyone.global.exception.ErrorCode; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +@Component +@RequiredArgsConstructor +@Slf4j +public class SettlementProcessEventListener { + + private final UserSettlementRepository userSettlementRepository; + private final UserSettlementService userSettlementService; + private final WalletRepository walletRepository; + private final WalletService walletService; + private final SettlementRepository settlementRepository; + private final ScheduleRepository scheduleRepository; + private final UserRepository userRepository; + + @PersistenceContext + private EntityManager em; + + @Async("settlementExecutor") + @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) + public void handleSettlementProcess(SettlementProcessEvent event) { + try { + processSettlement(event); + } catch (Exception e) { + // 필요 시 실패 알림/아웃박스 + } + } + + @Transactional + public void processSettlement(SettlementProcessEvent event) { + List succeeded = new ArrayList<>(); + List failed = new ArrayList<>(); + long totalProcessedAmount = 0; + + try { + for (Long participantId : event.getTargetUserIds()) { + boolean ok = processParticipantWithRetry( + event.getSettlementId(), + event.getLeaderId(), + event.getLeaderWalletId(), + participantId, + event.getCostPerUser() + ); + + if (ok) { + succeeded.add(participantId); + totalProcessedAmount += event.getCostPerUser(); + } else { + failed.add(participantId); + } + } + + if (!failed.isEmpty()) { + // 실패자 존재 시 → 리더 가산/완료 처리 금지 + throw new CustomException(ErrorCode.SETTLEMENT_PROCESS_FAILED); + } + + // 전원 성공 시에만 리더 가산 + userSettlementService.creditToLeader(event.getLeaderId(), totalProcessedAmount); + + // 스케줄 CLOSED + Schedule completedSchedule = scheduleRepository.findById(event.getScheduleId()) + .orElseThrow(() -> new CustomException(ErrorCode.SCHEDULE_NOT_FOUND)); + completedSchedule.updateStatus(ScheduleStatus.CLOSED); + scheduleRepository.save(completedSchedule); + + // 정산 COMPLETED + Settlement completedSettlement = settlementRepository.findById(event.getSettlementId()) + .orElseThrow(() -> new CustomException(ErrorCode.SETTLEMENT_NOT_FOUND)); + completedSettlement.update(TotalStatus.COMPLETED, LocalDateTime.now()); + settlementRepository.save(completedSettlement); + + } catch (Exception e) { + throw e; // 상위(비동기 핸들러)에서 로깅/알림 + } + } + + private boolean processParticipantWithRetry(Long settlementId, + Long leaderId, + Long leaderWalletId, + Long participantId, + Long amount) { + final int maxRetries = 3; + final int baseDelayMs = 400; + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + userSettlementService.processParticipantSettlement( + settlementId, leaderId, leaderWalletId, participantId, amount + ); + return true; + } catch (Exception e) { + // 마지막 시도 실패면 false + if (attempt == maxRetries) { + return false; + } + try { + Thread.sleep((long) baseDelayMs * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return false; + } + } + } + return false; + } +} diff --git a/src/main/java/com/example/onlyone/domain/settlement/service/SettlementService.java b/src/main/java/com/example/onlyone/domain/settlement/service/SettlementService.java index b5e1850a..5aac239f 100644 --- a/src/main/java/com/example/onlyone/domain/settlement/service/SettlementService.java +++ b/src/main/java/com/example/onlyone/domain/settlement/service/SettlementService.java @@ -9,6 +9,7 @@ import com.example.onlyone.domain.schedule.entity.UserSchedule; import com.example.onlyone.domain.schedule.repository.ScheduleRepository; import com.example.onlyone.domain.schedule.repository.UserScheduleRepository; +import com.example.onlyone.domain.settlement.dto.event.SettlementProcessEvent; import com.example.onlyone.domain.settlement.dto.response.SettlementResponseDto; import com.example.onlyone.domain.settlement.dto.response.UserSettlementDto; import com.example.onlyone.domain.settlement.entity.*; @@ -36,6 +37,7 @@ import java.time.LocalDateTime; import java.util.List; +import java.util.Map; @Log4j2 @Service @@ -51,269 +53,88 @@ public class SettlementService { private final WalletRepository walletRepository; private final NotificationService notificationService; private final WalletService walletService; + private final ApplicationEventPublisher eventPublisher; + private final OutboxAppender outboxAppender; -// /* 정산 Status를 REQUESTED -> COMPLETED로 스케줄링 (낙관적 락 적용)*/ -// @Scheduled(cron = "0 0 0 * * *") -// @Transactional -// public void updateTotalStatusIfAllCompleted() { -// List settlements = settlementRepository.findAllByTotalStatus(TotalStatus.REQUESTED); -// for (Settlement settlement : settlements) { -// long totalCount = userSettlementRepository.countBySettlement(settlement); -// long completedCount = userSettlementRepository.countBySettlementAndSettlementStatus(settlement, SettlementStatus.COMPLETED); -// User leader = userScheduleRepository.findLeaderByScheduleAndScheduleRole(settlement.getSchedule(), ScheduleRole.LEADER) -// .orElseThrow(() -> new CustomException(ErrorCode.LEADER_NOT_FOUND)); -// // 모든 정산이 완료된 경우 -// if (totalCount > 0 && totalCount == completedCount) { -// settlement.update(TotalStatus.COMPLETED, LocalDateTime.now()); -// settlementRepository.save(settlement); -// settlement.getSchedule().updateStatus(ScheduleStatus.CLOSED); -// // 정산 리더에게 완료 알림 -// notificationService.createNotification(leader, Type.SETTLEMENT, new String[]{String.valueOf(settlement.getSum())}); -// } -// } -// } - - /* 자동 정산 수행 */ @Transactional(rollbackFor = Exception.class) public void automaticSettlement(Long clubId, Long scheduleId) { + // 현재 사용자 조회 User user = userService.getCurrentUser(); - clubRepository.findById(clubId) - .orElseThrow(() -> new CustomException(ErrorCode.CLUB_NOT_FOUND)); + + // 클럽 존재 여부만 확인 (실제 엔티티는 사용하지 않음) + if (!clubRepository.existsById(clubId)) { + throw new CustomException(ErrorCode.CLUB_NOT_FOUND); + } + + // 스케줄 조회 Schedule schedule = scheduleRepository.findById(scheduleId) .orElseThrow(() -> new CustomException(ErrorCode.SCHEDULE_NOT_FOUND)); + // 종료된 스케줄인지 확인 - if (!(schedule.getScheduleStatus() == ScheduleStatus.ENDED || schedule.getScheduleTime().isBefore(LocalDateTime.now()))) { + if (!(schedule.getScheduleStatus() == ScheduleStatus.ENDED + || schedule.getScheduleTime().isBefore(LocalDateTime.now()))) { throw new CustomException(ErrorCode.BEFORE_SCHEDULE_END); } - UserSchedule leaderUserSchedule = userScheduleRepository.findByUserAndSchedule(user, schedule) - .orElseThrow(() -> new CustomException(ErrorCode.USER_SCHEDULE_NOT_FOUND)); - // 리더가 호출하고 있는지 확인 - if (leaderUserSchedule.getScheduleRole() != ScheduleRole.LEADER) { - throw new CustomException(ErrorCode.MEMBER_CANNOT_CREATE_SETTLEMENT); - } - - int userCount = userScheduleRepository.countBySchedule(schedule); + + // 정산 조회 Settlement settlement = settlementRepository.findBySchedule(schedule) .orElseThrow(() -> new CustomException(ErrorCode.SETTLEMENT_NOT_FOUND)); - - // 비용이 0원이거나 참여자가 1명(리더만)인 경우 → 바로 CLOSED 처리 후 리턴 - if (schedule.getCost() == 0 || userCount <= 1) { - schedule.updateStatus(ScheduleStatus.CLOSED); - schedule.removeSettlement(settlement); - settlementRepository.findBySchedule(schedule).ifPresent(s -> { - userSettlementRepository.deleteAllBySettlementId(settlement.getSettlementId()); - settlementRepository.deleteByScheduleId(schedule.getScheduleId()); - }); - return; + + if(settlement.getTotalStatus() == TotalStatus.COMPLETED || schedule.getScheduleStatus() == ScheduleStatus.CLOSED) { + throw new CustomException(ErrorCode.ALREADY_COMPLETED_SETTLEMENT); } - - // 멱등성 보장 & 진행 선점: HOLDING → IN_PROGRESS 선점 + + // 선점 트랜잭션 처리: HOLDING|FAILED → IN_PROGRESS (먼저 처리하여 동시성 제어) int updated = settlementRepository.markProcessing(settlement.getSettlementId()); if (updated != 1) { throw new CustomException(ErrorCode.ALREADY_SETTLING_SCHEDULE); } - // 이미 정산 중인 스케줄 예외 처리 - if (!((settlement.getTotalStatus() == TotalStatus.HOLDING) || (settlement.getTotalStatus() == TotalStatus.FAILED))) { - throw new CustomException(ErrorCode.ALREADY_SETTLING_SCHEDULE); + // 참가자 ID 목록과 수를 한 번에 조회 (성능 개선) + List targetUserIds = + userSettlementRepository.findAllUserSettlementIdsBySettlementIdAndStatus( + settlement.getSettlementId(), SettlementStatus.HOLD_ACTIVE); + + long userCount = targetUserIds.size(); + + // 가격이 0원이거나 참여자가 리더 1명인 경우 → 스케줄 종료 처리 + if (schedule.getCost() == 0 || userCount == 0) { + schedule.updateStatus(ScheduleStatus.CLOSED); + schedule.removeSettlement(settlement); + return; } - // 정산의 sum 업데이트 (count할 때는 리더 제외) - int totalAmount = (userCount - 1) * schedule.getCost(); + // 총 금액 계산 (리더 제외) + long totalAmount = userCount * schedule.getCost(); settlement.updateSum(totalAmount); - schedule.updateStatus(ScheduleStatus.SETTLING); - - User leader = userScheduleRepository.findLeaderByScheduleAndScheduleRole(schedule, ScheduleRole.LEADER) - .orElseThrow(() -> new CustomException(ErrorCode.LEADER_NOT_FOUND)); - Wallet leaderWallet = walletRepository.findByUserWithoutLock(leader) - .orElseThrow(() -> new CustomException(ErrorCode.WALLET_NOT_FOUND)); - - Wallet failWallet = null; - long failUserSettlementId = 0L; + settlementRepository.save(settlement); - // 자동 정산 수행 - try { - // 원자적 이체 - List targets = userSettlementRepository - .findAllBySettlement_SettlementIdAndSettlementStatus(settlement.getSettlementId(), SettlementStatus.HOLD_ACTIVE); + // 스케줄 저장 + scheduleRepository.save(schedule); - for (UserSettlement userSettlement : targets) { - Wallet memberWallet = walletRepository.findByUserWithoutLock(userSettlement.getUser()) - .orElseThrow(() -> new CustomException(ErrorCode.WALLET_NOT_FOUND)); - failWallet = memberWallet; - failUserSettlementId = userSettlement.getUserSettlementId(); - // 1) 홀드 캡처 (balance -= amt, hold -= amt) : 0행이면 비정상 → 예외 - int captured = walletRepository.captureHold(userSettlement.getUser().getUserId(), schedule.getCost()); - if (captured != 1) { - throw new CustomException(ErrorCode.WALLET_HOLD_CAPTURE_FAILED); - } - // 2) 리더 가산 - int credited = walletRepository.creditByUserId(leader.getUserId(), schedule.getCost()); - if (credited != 1) { - throw new CustomException(ErrorCode.WALLET_CREDIT_APPLY_FAILED); - } - // 3) 트랜잭션 기록 (멱등키: settlementId-userId) - walletService.createSuccessfulWalletTransactions( - memberWallet.getWalletId(), leaderWallet.getWalletId(), - schedule.getCost(), userSettlement); - - // 4) 상태 변경 - userSettlement.updateUserSettlement(SettlementStatus.COMPLETED, LocalDateTime.now()); - userSettlementRepository.save(userSettlement); - - // 5) 알림 -// notificationService.createNotification( -// userSettlement.getUser(), -// Type.SETTLEMENT, -// new String[]{String.valueOf(schedule.getCost())}); - } - // 모두 성공한 경우 - Schedule completedSchedule = scheduleRepository.findById(scheduleId) - .orElseThrow(() -> new CustomException(ErrorCode.SCHEDULE_NOT_FOUND)); - Settlement completedSettlement = settlementRepository.findBySchedule(completedSchedule) - .orElseThrow(() -> new CustomException(ErrorCode.SETTLEMENT_NOT_FOUND)); - completedSettlement.update(TotalStatus.COMPLETED, LocalDateTime.now()); - completedSchedule.updateStatus(ScheduleStatus.CLOSED); -// notificationService.createNotification( -// user, -// Type.SETTLEMENT, -// new String[]{String.valueOf(settlement.getSum())}); - // 예외를 잡아 별도 실패 기록 - } catch (CustomException e) { - Schedule failedSchedule = scheduleRepository.findById(scheduleId) - .orElseThrow(() -> new CustomException(ErrorCode.SETTLEMENT_NOT_FOUND)); - Settlement faildSettlement = settlementRepository.findBySchedule(failedSchedule) - .orElseThrow(() -> new CustomException(ErrorCode.SETTLEMENT_NOT_FOUND)); - faildSettlement.updateTotalStatus(TotalStatus.FAILED); - registerFailureLogAfterRollback(failWallet.getWalletId(), leaderWallet.getWalletId(), schedule.getCost(), failUserSettlementId, failWallet.getPostedBalance(), leaderWallet.getPostedBalance()); - throw e; - } catch (Exception e) { - Schedule failedSchedule = scheduleRepository.findById(scheduleId) - .orElseThrow(() -> new CustomException(ErrorCode.SETTLEMENT_NOT_FOUND)); - Settlement faildSettlement = settlementRepository.findBySchedule(failedSchedule) - .orElseThrow(() -> new CustomException(ErrorCode.SETTLEMENT_NOT_FOUND)); - faildSettlement.updateTotalStatus(TotalStatus.FAILED); - registerFailureLogAfterRollback(failWallet.getWalletId(), leaderWallet.getWalletId(), schedule.getCost(), failUserSettlementId, failWallet.getPostedBalance(), leaderWallet.getPostedBalance()); - throw new CustomException(ErrorCode.SETTLEMENT_PROCESS_FAILED); - } - } - -// /* 정산 요청 생성 */ -// @Deprecated -// public void createSettlement(Long clubId, Long scheduleId) { -// User user = userService.getCurrentUser(); -// clubRepository.findById(clubId) -// .orElseThrow(() -> new CustomException(ErrorCode.CLUB_NOT_FOUND)); -// Schedule schedule = scheduleRepository.findById(scheduleId) -// .orElseThrow(() -> new CustomException(ErrorCode.SCHEDULE_NOT_FOUND)); -// // 종료된 스케줄인지 확인 -// if (!(schedule.getScheduleStatus() == ScheduleStatus.ENDED || schedule.getScheduleTime().isBefore(LocalDateTime.now()))) { -// throw new CustomException(ErrorCode.BEFORE_SCHEDULE_END); -// } -// UserSchedule leaderUserSchedule = userScheduleRepository.findByUserAndSchedule(user, schedule) -// .orElseThrow(() -> new CustomException(ErrorCode.USER_SCHEDULE_NOT_FOUND)); -// // 리더가 호출하고 있는지 확인 -// if (leaderUserSchedule.getScheduleRole() != ScheduleRole.LEADER) { -// throw new CustomException(ErrorCode.MEMBER_CANNOT_CREATE_SETTLEMENT); -// } -// int userCount = userScheduleRepository.countBySchedule(schedule); -// // 비용이 0원이거나 참여자가 1명(리더만)인 경우 → 바로 CLOSED 처리 후 리턴 -// if (schedule.getCost() == 0 || userCount <= 1) { -// schedule.updateStatus(ScheduleStatus.CLOSED); -// return; -// } -// schedule.updateStatus(ScheduleStatus.SETTLING); -// int totalAmount = (userCount - 1) * schedule.getCost(); -// Settlement settlement = Settlement.builder() -// .schedule(schedule) -// .sum(totalAmount) -// .totalStatus(TotalStatus.REQUESTED) -// .receiver(user) -// .build(); -// settlementRepository.save(settlement); -// List userSchedules = userScheduleRepository.findUserSchedulesBySchedule(schedule); -// userSchedules.remove(leaderUserSchedule); -// List userSettlements = userSchedules.stream() -// .map(userSchedule -> UserSettlement.builder() -// .user(userSchedule.getUser()) -// .settlement(settlement) -// .settlementStatus(SettlementStatus.REQUESTED) -// .build()) -// .toList(); -// userSettlementRepository.saveAll(userSettlements); -// } -// -// /* 참여자의 정산 수행 */ -// @Deprecated -// @Transactional(rollbackFor = Exception.class) -// public void updateUserSettlement(Long clubId, Long scheduleId) { -// User user = userService.getCurrentUser(); -// // 검증 로직 -// clubRepository.findById(clubId) -// .orElseThrow(() -> new CustomException(ErrorCode.CLUB_NOT_FOUND)); -// Schedule schedule = scheduleRepository.findById(scheduleId) -// .orElseThrow(() -> new CustomException(ErrorCode.SCHEDULE_NOT_FOUND)); -// UserSettlement userSettlement = userSettlementRepository.findByUserAndSchedule(user, schedule) -// .orElseThrow(() -> new CustomException(ErrorCode.USER_SETTLEMENT_NOT_FOUND)); -// userScheduleRepository.findByUserAndSchedule(user, schedule) -// .orElseThrow(() -> new CustomException(ErrorCode.USER_SCHEDULE_NOT_FOUND)); -// if (userSettlement.getSettlement().getTotalStatus() == TotalStatus.COMPLETED || -// userSettlement.getSettlementStatus() == SettlementStatus.COMPLETED) { -// throw new CustomException(ErrorCode.ALREADY_SETTLED_USER); -// } -// User leader = userScheduleRepository.findLeaderByScheduleAndScheduleRole(schedule, ScheduleRole.LEADER) -// .orElseThrow(() -> new CustomException(ErrorCode.LEADER_NOT_FOUND)); -// // 비관적 락으로 Wallet 조회 -// Wallet wallet = walletRepository.findByUser(user) -// .orElseThrow(() -> new CustomException(ErrorCode.WALLET_NOT_FOUND)); -// Wallet leaderWallet = walletRepository.findByUser(leader) -// .orElseThrow(() -> new CustomException(ErrorCode.WALLET_NOT_FOUND)); -// int amount = schedule.getCost(); -// try { -// // 잔액 부족 확인 -// if (wallet.getPostedBalance() < amount) { -// throw new CustomException(ErrorCode.WALLET_BALANCE_NOT_ENOUGH); -// } -// // 1. 잔액 변경 전 상태 저장 -// int beforeBalance = wallet.getPostedBalance(); -// int leaderBeforeBalance = leaderWallet.getPostedBalance(); -// // 2. 실제 잔액 변경 -// wallet.updateBalance(beforeBalance - amount); -// leaderWallet.updateBalance(leaderBeforeBalance + amount); -// // 3. 변경된 잔액으로 WalletTransaction 생성 -// walletService.createSuccessfulWalletTransactions( -// wallet.getWalletId(), leaderWallet.getWalletId(), amount, -// userSettlement -// ); -// // 4. UserSettlement 상태 변경 -// userSettlement.updateUserSettlement(SettlementStatus.COMPLETED, LocalDateTime.now()); // PENDING -> COMPLETED -// // 5. 모든 변경사항 저장 -// walletRepository.save(wallet); -// walletRepository.save(leaderWallet); -// userSettlementRepository.save(userSettlement); -// // 6. 알림 -// notificationService.createNotification(user, -// Type.SETTLEMENT, -// new String[]{String.valueOf(amount)}); -// } catch (CustomException e) { -// registerFailureLogAfterRollback(wallet.getWalletId(), leaderWallet.getWalletId(), amount, userSettlement.getUserSettlementId(), wallet.getPostedBalance(), leaderWallet.getPostedBalance()); -// throw e; -// } catch (Exception e) { -// registerFailureLogAfterRollback(wallet.getWalletId(), leaderWallet.getWalletId(), amount, userSettlement.getUserSettlementId(), wallet.getPostedBalance(), leaderWallet.getPostedBalance()); -// throw new CustomException(ErrorCode.SETTLEMENT_PROCESS_FAILED); -// } -// } + // 리더 지갑 조회 + Wallet leaderWallet = walletRepository.findByUserWithoutLock(user) + .orElseThrow(() -> new CustomException(ErrorCode.WALLET_NOT_FOUND)); - /* 트랜잭션 롤백 후 실패 로그를 기록하기 위한 메서드*/ - protected void registerFailureLogAfterRollback(long wId, long lwId, int amount, - long usId, int wBal, int lwBal) { - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { - @Override public void afterCompletion(int status) { - if (status == STATUS_ROLLED_BACK) { - walletService.createFailedWalletTransactions(wId, lwId, amount, usId, wBal, lwBal); - } - } - }); + // 정산 프로세스 이벤트 발행 + outboxAppender.append( + "Settlement", + settlement.getSettlementId(), + "SettlementProcessEvent", + String.valueOf(settlement.getSettlementId()), + Map.of( + "eventId", java.util.UUID.randomUUID().toString(), + "occurredAt", java.time.Instant.now().toString(), + "settlementId", settlement.getSettlementId(), + "scheduleId", scheduleId, + "clubId", clubId, + "leaderId", user.getUserId(), + "leaderWalletId", leaderWallet.getWalletId(), + "costPerUser", schedule.getCost(), + "totalAmount", totalAmount, + "targetUserIds", targetUserIds + ) + ); } /* 스케줄 참여자 정산 목록 조회 */ diff --git a/src/main/java/com/example/onlyone/domain/settlement/service/UserSettlementService.java b/src/main/java/com/example/onlyone/domain/settlement/service/UserSettlementService.java new file mode 100644 index 00000000..925f5e62 --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/settlement/service/UserSettlementService.java @@ -0,0 +1,154 @@ +package com.example.onlyone.domain.settlement.service; + +import com.example.onlyone.domain.schedule.repository.ScheduleRepository; +import com.example.onlyone.domain.settlement.dto.event.WalletCaptureFailedEvent; +import com.example.onlyone.domain.settlement.dto.event.WalletCaptureSucceededEvent; +import com.example.onlyone.domain.settlement.entity.SettlementStatus; +import com.example.onlyone.domain.settlement.entity.UserSettlement; +import com.example.onlyone.domain.settlement.repository.SettlementRepository; +import com.example.onlyone.domain.settlement.repository.UserSettlementRepository; +import com.example.onlyone.domain.user.entity.User; +import com.example.onlyone.domain.user.repository.UserRepository; +import com.example.onlyone.domain.wallet.entity.Wallet; +import com.example.onlyone.domain.wallet.repository.WalletRepository; +import com.example.onlyone.domain.wallet.service.RedisLuaService; +import com.example.onlyone.domain.wallet.service.WalletService; +import com.example.onlyone.global.exception.CustomException; +import com.example.onlyone.global.exception.ErrorCode; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +import java.time.LocalDateTime; +import java.util.Map; + +@Log4j2 +@Service +@Transactional +@RequiredArgsConstructor +public class UserSettlementService { + private final UserSettlementRepository userSettlementRepository; + private final WalletRepository walletRepository; + private final WalletService walletService; + private final SettlementRepository settlementRepository; + private final ScheduleRepository scheduleRepository; + private final UserRepository userRepository; + private final RedisLuaService redisLuaService; + + private final OutboxAppender outboxAppender; + private final FailedEventAppender failedEventAppender; + + /** + * 참가자별 개별 정산 처리 (독립 트랜잭션) + * - 성공: UserSettlement.COMPLETED + SUCCESS 이벤트 Outbox + * - 실패: UserSettlement.FAILED + FAILED 이벤트 Outbox + */ + @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) + public void processParticipantSettlement(Long settlementId, + Long leaderId, + Long leaderWalletId, + Long participantId, + Long amount) { + redisLuaService.withWalletGate(participantId, "capture", 10, () -> { + // 조회 + UserSettlement us = userSettlementRepository + .findBySettlement_SettlementIdAndUser_UserId(settlementId, participantId) + .orElseThrow(() -> new CustomException(ErrorCode.USER_SETTLEMENT_NOT_FOUND)); + // 이미 처리 완료면 멱등 스킵 + if (us.getSettlementStatus() == SettlementStatus.COMPLETED) { + return; + } + User participant = userRepository.findById(participantId) + .orElseThrow(() -> new CustomException(ErrorCode.USER_NOT_FOUND)); + Wallet memberWallet = walletRepository.findByUserWithoutLock(participant) + .orElseThrow(() -> new CustomException(ErrorCode.WALLET_NOT_FOUND)); + Long memberWalletId = memberWallet.getWalletId(); + String operationId = ("stl:%d:usr:%d:v1").formatted(settlementId, participantId); + try { + // 조건부 UPDATE + int captured = walletRepository.captureHold(participantId, amount); + if (captured != 1) { + throw new CustomException(ErrorCode.WALLET_HOLD_CAPTURE_FAILED); + } + // 상태 변경 + us.updateUserSettlement(SettlementStatus.COMPLETED, LocalDateTime.now()); + userSettlementRepository.save(us); + // 성공 이벤트 Outbox 기록 + outboxAppender.append( + "UserSettlement", + us.getUserSettlementId(), + "ParticipantSettlementResult", + String.valueOf(memberWalletId), + Map.of( + "type", "SUCCESS", + "operationId", operationId, + "occurredAt", java.time.Instant.now().toString(), + "settlementId", settlementId, + "userSettlementId", us.getUserSettlementId(), + "participantId", participantId, + "memberWalletId", memberWalletId, + "leaderId", leaderId, + "leaderWalletId", leaderWalletId, + "amount", amount + ) + ); + } catch (Exception e) { + userSettlementRepository.updateStatusIfRequested(us.getUserSettlementId(), SettlementStatus.FAILED); + failedEventAppender.appendFailedUserSettlementEvent( + settlementId, us.getUserSettlementId(), participantId, + memberWalletId, leaderId, leaderWalletId, amount + ); + throw e; + } + }); + } + +// +// /** +// * 참가자별 개별 정산 처리 (독립 트랜잭션) +// */ +// @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) +// public void processParticipantSettlement(Long settlementId, Long leaderWalletId, Long participantId, Long amount) { +// redisLuaService.withWalletGate(participantId, "capture", 10, () -> { +// UserSettlement userSettlement = userSettlementRepository +// .findBySettlement_SettlementIdAndUser_UserId(settlementId, participantId) +// .orElseThrow(() -> new CustomException(ErrorCode.USER_SETTLEMENT_NOT_FOUND)); +// +// // 이미 처리된 경우 스킵 (멱등성) +// if (userSettlement.getSettlementStatus() == SettlementStatus.COMPLETED) { +// return; +// } +// // 홀드 캡처 (차감) +// int captured = walletRepository.captureHold(participantId, amount); +// if (captured != 1) { +// throw new CustomException(ErrorCode.WALLET_HOLD_CAPTURE_FAILED); +// } +// // 상태 변경 +// userSettlement.updateUserSettlement(SettlementStatus.COMPLETED, LocalDateTime.now()); +// userSettlementRepository.save(userSettlement); +// // 트랜잭션 기록 +// User participant = userRepository.findById(participantId) +// .orElseThrow(() -> new CustomException(ErrorCode.USER_NOT_FOUND)); +// Wallet memberWallet = walletRepository.findByUserWithoutLock(participant) +// .orElseThrow(() -> new CustomException(ErrorCode.WALLET_NOT_FOUND)); +// walletService.createSuccessfulWalletTransactions(memberWallet.getWalletId(), leaderWalletId, amount, userSettlement); +// }); +// } + + /** + * 리더에게 총액 가산 (독립 트랜잭션) + */ + @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) + public void creditToLeader(Long leaderId, long totalAmount) { + redisLuaService.withWalletGate(leaderId, "credit", 10, () -> { + int credited = walletRepository.creditByUserId(leaderId, totalAmount); + if (credited != 1) { + throw new CustomException(ErrorCode.WALLET_CREDIT_APPLY_FAILED); + } + }); + } +} diff --git a/src/main/java/com/example/onlyone/domain/user/dto/response/MyPageResponse.java b/src/main/java/com/example/onlyone/domain/user/dto/response/MyPageResponse.java index c462b7c6..efdf6dfc 100644 --- a/src/main/java/com/example/onlyone/domain/user/dto/response/MyPageResponse.java +++ b/src/main/java/com/example/onlyone/domain/user/dto/response/MyPageResponse.java @@ -32,5 +32,5 @@ public class MyPageResponse { @JsonProperty("interests_list") private List interestsList; - private Integer balance; + private Long balance; } \ No newline at end of file diff --git a/src/main/java/com/example/onlyone/domain/user/dto/response/MySettlementDto.java b/src/main/java/com/example/onlyone/domain/user/dto/response/MySettlementDto.java index 8179a18b..177f1e83 100644 --- a/src/main/java/com/example/onlyone/domain/user/dto/response/MySettlementDto.java +++ b/src/main/java/com/example/onlyone/domain/user/dto/response/MySettlementDto.java @@ -13,7 +13,7 @@ public class MySettlementDto { private Long clubId; private Long scheduleId; - private int amount; + private Long amount; private String mainImage; private SettlementStatus settlementStatus; private String title; diff --git a/src/main/java/com/example/onlyone/domain/user/service/UserService.java b/src/main/java/com/example/onlyone/domain/user/service/UserService.java index 3432be4b..bd55abe6 100644 --- a/src/main/java/com/example/onlyone/domain/user/service/UserService.java +++ b/src/main/java/com/example/onlyone/domain/user/service/UserService.java @@ -235,7 +235,7 @@ public void signup(SignupRequestDto signupRequest) { // 사용자 지갑 생성 및 웰컴 포인트 100000원 지급 Wallet wallet = Wallet.builder() .user(user) - .postedBalance(100000) + .postedBalance(100000L) .build(); walletRepository.save(wallet); @@ -279,7 +279,7 @@ public MyPageResponse getMyPage() { // 사용자 지갑 정보 조회 Optional walletOpt = walletRepository.findByUserWithoutLock(user); - Integer balance = walletOpt.map(Wallet::getPostedBalance).orElse(0); + Long balance = walletOpt.map(Wallet::getPostedBalance).orElse(0L); return MyPageResponse.builder() .nickname(user.getNickname()) diff --git a/src/main/java/com/example/onlyone/domain/wallet/dto/response/UserWalletTransactionDto.java b/src/main/java/com/example/onlyone/domain/wallet/dto/response/UserWalletTransactionDto.java index 94e13135..59734c2f 100644 --- a/src/main/java/com/example/onlyone/domain/wallet/dto/response/UserWalletTransactionDto.java +++ b/src/main/java/com/example/onlyone/domain/wallet/dto/response/UserWalletTransactionDto.java @@ -17,7 +17,7 @@ public class UserWalletTransactionDto { private String title; private WalletTransactionStatus status; private String mainImage; - private int amount; + private Long amount; private LocalDateTime createdAt; public static UserWalletTransactionDto from(WalletTransaction walletTransaction, String title, String mainImage) { diff --git a/src/main/java/com/example/onlyone/domain/wallet/entity/Wallet.java b/src/main/java/com/example/onlyone/domain/wallet/entity/Wallet.java index 75312129..95428a15 100644 --- a/src/main/java/com/example/onlyone/domain/wallet/entity/Wallet.java +++ b/src/main/java/com/example/onlyone/domain/wallet/entity/Wallet.java @@ -36,15 +36,15 @@ public class Wallet extends BaseTimeEntity { // private int balance; @Column(name = "posted_balance") - private int postedBalance; + private Long postedBalance; @Column(name = "pending_out") - private int pendingOut; + private Long pendingOut; @OneToMany(mappedBy = "wallet", cascade = CascadeType.ALL, orphanRemoval = true) private List walletTransactions = new ArrayList<>(); - public void updateBalance(int balance) { + public void updateBalance(Long balance) { this.postedBalance = balance; } } \ No newline at end of file diff --git a/src/main/java/com/example/onlyone/domain/wallet/entity/WalletTransaction.java b/src/main/java/com/example/onlyone/domain/wallet/entity/WalletTransaction.java index b5a59f83..a68c815a 100644 --- a/src/main/java/com/example/onlyone/domain/wallet/entity/WalletTransaction.java +++ b/src/main/java/com/example/onlyone/domain/wallet/entity/WalletTransaction.java @@ -22,6 +22,9 @@ public class WalletTransaction extends BaseTimeEntity { @Column(name = "wallet_transaction_id", updatable = false) private Long walletTransactionId; + @Column(name = "operation_id", unique = true) + private String operationId; + @Column(name = "type") @NotNull @Enumerated(EnumType.STRING) @@ -29,11 +32,11 @@ public class WalletTransaction extends BaseTimeEntity { @Column(name = "amount") @NotNull - private int amount; + private Long amount; @Column(name = "balance") @NotNull - private int balance; + private Long balance; @Column(name = "status") @NotNull @@ -75,7 +78,7 @@ public void updateStatus(WalletTransactionStatus walletTransactionStatus) { this.walletTransactionStatus = walletTransactionStatus; } - public void update(Type type, int amount, int postedBalance, WalletTransactionStatus walletTransactionStatus, Wallet wallet) { + public void update(Type type, Long amount, Long postedBalance, WalletTransactionStatus walletTransactionStatus, Wallet wallet) { this.type = type; this.amount = amount; this.balance = postedBalance; diff --git a/src/main/java/com/example/onlyone/domain/wallet/repository/WalletTransactionRepository.java b/src/main/java/com/example/onlyone/domain/wallet/repository/WalletTransactionRepository.java index 8735fae6..0f51a910 100644 --- a/src/main/java/com/example/onlyone/domain/wallet/repository/WalletTransactionRepository.java +++ b/src/main/java/com/example/onlyone/domain/wallet/repository/WalletTransactionRepository.java @@ -8,6 +8,11 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.util.Collection; +import java.util.Set; public interface WalletTransactionRepository extends JpaRepository { @@ -28,4 +33,8 @@ Page findByWalletAndWalletTransactionStatus( WalletTransactionStatus walletTransactionStatus, Pageable pageable ); + + @Query("select wt.operationId from WalletTransaction wt where wt.operationId in :operationIds") + Set findExistingOperationIds(@Param("operationIds") Collection operationIds); + } \ No newline at end of file diff --git a/src/main/java/com/example/onlyone/domain/wallet/service/RedisLuaService.java b/src/main/java/com/example/onlyone/domain/wallet/service/RedisLuaService.java new file mode 100644 index 00000000..c260119a --- /dev/null +++ b/src/main/java/com/example/onlyone/domain/wallet/service/RedisLuaService.java @@ -0,0 +1,75 @@ +package com.example.onlyone.domain.wallet.service; + +import com.example.onlyone.global.exception.CustomException; +import com.example.onlyone.global.exception.ErrorCode; +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; + +@Component +@RequiredArgsConstructor +public class RedisLuaService { + private final StringRedisTemplate redis; + private final DefaultRedisScript walletGateAcquireScript; + private final DefaultRedisScript walletGateReleaseScript; + + private static String gateKey(long userId, String op) { + // 클러스터 슬롯 고정 + return "wallet:gate:{" + userId + "}:" + op; + } + + /** 게이트 획득: 성공 시 owner 토큰, 실패 시 null */ + public String acquireWalletGate(long userId, String op, int ttlSec) { + String key = gateKey(userId, op); + String owner = UUID.randomUUID().toString(); + Long ok = redis.execute(walletGateAcquireScript, List.of(key), + String.valueOf(ttlSec), owner); + return (ok != null && ok == 1L) ? owner : null; + } + + /** 게이트 해제: owner 일치할 때만 삭제 */ + public void releaseWalletGate(long userId, String op, String owner) { + if (owner == null) return; + String key = gateKey(userId, op); + redis.execute(walletGateReleaseScript, List.of(key), owner); + } + + /** 게이트 잡고 함수 실행 (+재시도) */ + public T withWalletGate(long userId, String op, int ttlSec, Supplier body) { + final int maxAttempts = 5; + for (int attempt = 1; attempt <= maxAttempts; attempt++) { + String owner = acquireWalletGate(userId, op, ttlSec); + if (owner != null) { + try { + return body.get(); + } finally { + releaseWalletGate(userId, op, owner); + } + } + // acquire 실패한 경우 + if (attempt < maxAttempts) { + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(5, 20)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new CustomException(ErrorCode.WALLET_OPERATION_IN_PROGRESS); + } + } + } + // 모든 시도 실패 시에만 예외 + throw new CustomException(ErrorCode.WALLET_OPERATION_IN_PROGRESS); + } + + + /** void용 */ + public void withWalletGate(long userId, String op, int ttlSec, Runnable body) { + withWalletGate(userId, op, ttlSec, () -> { body.run(); return null; }); + } +} + diff --git a/src/main/java/com/example/onlyone/domain/wallet/service/WalletService.java b/src/main/java/com/example/onlyone/domain/wallet/service/WalletService.java index 8e7de141..10281907 100644 --- a/src/main/java/com/example/onlyone/domain/wallet/service/WalletService.java +++ b/src/main/java/com/example/onlyone/domain/wallet/service/WalletService.java @@ -80,7 +80,7 @@ public UserWalletTransactionDto convertToDto(WalletTransaction walletTransaction } } - public void createSuccessfulWalletTransactions(Long walletId, Long leaderWalletId, int amount, + public void createSuccessfulWalletTransactions(Long walletId, Long leaderWalletId, Long amount, UserSettlement userSettlement) { Wallet wallet = walletRepository.findById(walletId).orElseThrow(); Wallet leaderWallet = walletRepository.findById(leaderWalletId).orElseThrow(); @@ -127,8 +127,8 @@ public void createAndSaveTransfers(UserSettlement userSettlement, WalletTransact @Transactional(propagation = Propagation.REQUIRES_NEW) - public void createFailedWalletTransactions(Long walletId, Long leaderWalletId, int amount, - Long userSettlementId, int walletBalance, int leaderWalletBalance) { + public void createFailedWalletTransactions(Long walletId, Long leaderWalletId, Long amount, + Long userSettlementId, Long walletBalance, Long leaderWalletBalance) { Wallet wallet = walletRepository.getReferenceById(walletId); Wallet leaderWallet = walletRepository.getReferenceById(leaderWalletId); UserSettlement userSettlement = userSettlementRepository.getReferenceById(userSettlementId); diff --git a/src/main/java/com/example/onlyone/global/config/AsyncConfig.java b/src/main/java/com/example/onlyone/global/config/AsyncConfig.java new file mode 100644 index 00000000..46678dcc --- /dev/null +++ b/src/main/java/com/example/onlyone/global/config/AsyncConfig.java @@ -0,0 +1,77 @@ +package com.example.onlyone.global.config; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; +import org.springframework.scheduling.annotation.EnableAsync; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +@Configuration +@EnableAsync +public class AsyncConfig { + + /** + * 가상 스레드 기반 정산 실행기 + * - 동시 실행 상한(permits)으로 DB/Redis 백프레셔 + * - 종료 시 작업 완료 대기(awaitSec) + * - Redis 커넥션 팩토리보다 먼저 내려가도록 설정(@DependsOn) + */ + @Bean(name = "settlementExecutor") + @DependsOn("redisConnectionFactory") + public Executor settlementExecutor( + @Value("${app.settlement.concurrency:32}") int permits, + @Value("${app.settlement.shutdown.await-seconds:60}") int awaitSec + ) { + // 스레드 이름 prefix 적용된 가상 스레드 팩토리 + ThreadFactory tf = Thread.ofVirtual().name("settlement-", 0).factory(); + ExecutorService delegate = Executors.newThreadPerTaskExecutor(tf); + return new BoundedVtExecutor(delegate, permits, awaitSec); + } + + /** + * 무제한 가상 스레드에 세마포어로 동시 실행 상한을 주고, + * 종료 시 graceful shutdown을 보장하는 래퍼. + * execute() 호출 스레드를 블로킹하지 않기 위해, + * 실제 대기는 가상 스레드 안에서 수행한다. + */ + static final class BoundedVtExecutor implements Executor, DisposableBean { + private final ExecutorService es; + private final Semaphore sem; + private final int awaitSec; + + BoundedVtExecutor(ExecutorService es, int permits, int awaitSec) { + this.es = es; + this.sem = new Semaphore(permits); + this.awaitSec = awaitSec; + } + + @Override + public void execute(Runnable task) { + // 제출 스레드는 즉시 반환, 가상 스레드 내에서 상한 대기 + es.execute(() -> { + sem.acquireUninterruptibly(); + try { + task.run(); + } finally { + sem.release(); + } + }); + } + + @Override + public void destroy() throws Exception { + es.shutdown(); // 새 작업 받지 않음 + if (!es.awaitTermination(awaitSec, TimeUnit.SECONDS)) { + es.shutdownNow(); + } + } + } +} diff --git a/src/main/java/com/example/onlyone/global/config/RedisLuaConfig.java b/src/main/java/com/example/onlyone/global/config/RedisLuaConfig.java new file mode 100644 index 00000000..cb1f27d4 --- /dev/null +++ b/src/main/java/com/example/onlyone/global/config/RedisLuaConfig.java @@ -0,0 +1,27 @@ +package com.example.onlyone.global.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.scripting.support.ResourceScriptSource; + +import java.util.List; + +@Configuration +public class RedisLuaConfig { + @Bean + public DefaultRedisScript walletGateAcquireScript() { + var s = new DefaultRedisScript(); + s.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/wallet_gate_acquire.lua"))); + s.setResultType(Long.class); + return s; + } + @Bean + public DefaultRedisScript walletGateReleaseScript() { + var s = new DefaultRedisScript(); + s.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/wallet_gate_release.lua"))); + s.setResultType(Long.class); + return s; + } +} diff --git a/src/main/java/com/example/onlyone/global/config/kafka/KafkaConsumerConfig.java b/src/main/java/com/example/onlyone/global/config/kafka/KafkaConsumerConfig.java new file mode 100644 index 00000000..1f33a1f7 --- /dev/null +++ b/src/main/java/com/example/onlyone/global/config/kafka/KafkaConsumerConfig.java @@ -0,0 +1,90 @@ +package com.example.onlyone.global.config.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.CommonLoggingErrorHandler; +import org.springframework.kafka.listener.ContainerProperties; + +import java.util.HashMap; +import java.util.Map; + +import static org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE; + +@RequiredArgsConstructor +@Configuration +@EnableKafka // @KafkaListener를 사용하기 위한 조건 +public class KafkaConsumerConfig { + + private final KafkaProperties props; + + @Bean + public ConsumerFactory userSettlementLedgerConsumerFactory() { + return setConsumerFactory(props.getConsumer().getCommonConfig(), props.getSecurity()); + } + + private ConsumerFactory setConsumerFactory(final KafkaProperties.ConsumerCommonConfig c, + final KafkaProperties.Security s) { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.getBootstrapServers()); + config.put(ConsumerConfig.GROUP_ID_CONFIG, c.getGroupId()); + config.put(ConsumerConfig.CLIENT_ID_CONFIG, c.getClientId()); + config.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, c.getTimeoutMs()); + config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, c.getFetchMinBytes()); + config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, c.getFetchMaxWaitMs()); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + config.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + + // 보안 설정 + if (s != null && s.isEnabled()) { + config.put("security.protocol", s.getProtocol()); + config.put("sasl.mechanism", s.getMechanism()); + config.put("sasl.jaas.config", s.getJaas()); + if (s.getSslTruststoreLocation() != null && !s.getSslTruststoreLocation().isBlank()) { + config.put("ssl.truststore.location", s.getSslTruststoreLocation()); + config.put("ssl.truststore.password", s.getSslTruststorePassword()); + } + if (s.getEndpointIdentificationAlgorithm() != null) { + config.put("ssl.endpoint.identification.algorithm", s.getEndpointIdentificationAlgorithm()); + } + } + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory userSettlementLedgerKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory f = new ConcurrentKafkaListenerContainerFactory<>(); + // Consumer가 어떤 설정으로 동작할지 지정 + f.setConsumerFactory(userSettlementLedgerConsumerFactory()); + // Batch Mode로 받고 싶은 경우 + f.setBatchListener(true); + f.getContainerProperties().setAckMode(MANUAL_IMMEDIATE); + // Prometheus/Grafana를 위한 메트릭 노출 + f.getContainerProperties().setObservationEnabled(true); + return f; + } + + @Bean + public ConsumerFactory settlementProcessConsumerFactory() { + return setConsumerFactory(props.getConsumer().getCommonConfig(), props.getSecurity()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory settlementProcessKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory f = new ConcurrentKafkaListenerContainerFactory<>(); + f.setConsumerFactory(settlementProcessConsumerFactory()); + f.setBatchListener(true); + f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return f; + } + +} + diff --git a/src/main/java/com/example/onlyone/global/config/kafka/KafkaErrorConfig.java b/src/main/java/com/example/onlyone/global/config/kafka/KafkaErrorConfig.java new file mode 100644 index 00000000..afb89caf --- /dev/null +++ b/src/main/java/com/example/onlyone/global/config/kafka/KafkaErrorConfig.java @@ -0,0 +1,23 @@ +package com.example.onlyone.global.config.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; + +@Configuration +@RequiredArgsConstructor +public class KafkaErrorConfig { + private final KafkaTemplate kafkaTemplate; + + @Bean + public DefaultErrorHandler defaultErrorHandler() { + DeadLetterPublishingRecoverer recoverer = + new DeadLetterPublishingRecoverer(kafkaTemplate); + var backoff = new FixedBackOff(5_000L, 3L); // 5s x 3회 + return new DefaultErrorHandler(recoverer, backoff); + } +} diff --git a/src/main/java/com/example/onlyone/global/config/kafka/KafkaProducerConfig.java b/src/main/java/com/example/onlyone/global/config/kafka/KafkaProducerConfig.java new file mode 100644 index 00000000..8f160af5 --- /dev/null +++ b/src/main/java/com/example/onlyone/global/config/kafka/KafkaProducerConfig.java @@ -0,0 +1,57 @@ +package com.example.onlyone.global.config.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +@RequiredArgsConstructor +@Configuration +public class KafkaProducerConfig { + + private final KafkaProperties props; + + @Bean + public ProducerFactory ledgerProducerFactory() { + var producer = props.getProducer().getCommonConfig(); + var security = props.getSecurity(); + + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producer.getBootstrapServers()); + config.put(ProducerConfig.CLIENT_ID_CONFIG, producer.getClientId()); + config.put(ProducerConfig.ACKS_CONFIG, producer.getAcks()); + config.put(ProducerConfig.LINGER_MS_CONFIG, producer.getLingerMs()); + config.put(ProducerConfig.BATCH_SIZE_CONFIG, producer.getBatchSize()); + config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + // 보안 + if (security != null && security.isEnabled()) { + config.put("security.protocol", security.getProtocol()); + config.put("sasl.mechanism", security.getMechanism()); + config.put("sasl.jaas.config", security.getJaas()); + if (security.getSslTruststoreLocation() != null && !security.getSslTruststoreLocation().isBlank()) { + config.put("ssl.truststore.location", security.getSslTruststoreLocation()); + config.put("ssl.truststore.password", security.getSslTruststorePassword()); + } + } + var pf = new DefaultKafkaProducerFactory(config); + pf.setTransactionIdPrefix(props.getProducer().getCommonConfig().getTransactionalIdPrefix()); + return pf; + } + + @Bean + public KafkaTemplate ledgerKafkaTemplate() { + return new KafkaTemplate<>(ledgerProducerFactory()); + } +} diff --git a/src/main/java/com/example/onlyone/global/config/kafka/KafkaProperties.java b/src/main/java/com/example/onlyone/global/config/kafka/KafkaProperties.java new file mode 100644 index 00000000..ec5b05c9 --- /dev/null +++ b/src/main/java/com/example/onlyone/global/config/kafka/KafkaProperties.java @@ -0,0 +1,73 @@ +package com.example.onlyone.global.config.kafka; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Data +@Component +@ConfigurationProperties(prefix = "spring.kafka") +public class KafkaProperties { + + private String defaultBootstrapServers; + private Consumer consumer = new Consumer(); + private Producer producer = new Producer(); + private Security security = new Security(); + + @Data + public static class Consumer { + private ConsumerCommonConfig commonConfig = new ConsumerCommonConfig(); + private ConsumerConfig userSettlementLedgerConsumerConfig = new ConsumerConfig(); + } + + @Data + public static class Producer { + private ProducerCommonConfig commonConfig = new ProducerCommonConfig(); + private ProducerConfig settlementProcessProducerConfig = new ProducerConfig(); + // 필요 시 다른 프로듀서 config 추가 + } + + @Data + public static class ConsumerCommonConfig { + private String groupId; + private String clientId; + private List bootstrapServers; + private String timeoutMs; + private int fetchMinBytes; + private int fetchMaxWaitMs; + } + + @Data + public static class ConsumerConfig { + private String topic; + } + + @Data + public static class ProducerCommonConfig { + private String clientId; + private List bootstrapServers; + private String transactionalIdPrefix; + private String acks; + private Integer lingerMs; + private Integer batchSize; + } + + @Data + public static class ProducerConfig { + private String topic; + } + + @Data + public static class Security { + private boolean enabled; + private String protocol; // SASL_SSL, etc. + private String mechanism; // PLAIN, SCRAM-SHA-512, etc. + private String jaas; // JAAS config string + private String sslTruststoreLocation; + private String sslTruststorePassword; + private String endpointIdentificationAlgorithm; // https + } +} + diff --git a/src/main/java/com/example/onlyone/global/config/kafka/KafkaTopicConfig.java b/src/main/java/com/example/onlyone/global/config/kafka/KafkaTopicConfig.java new file mode 100644 index 00000000..373b139a --- /dev/null +++ b/src/main/java/com/example/onlyone/global/config/kafka/KafkaTopicConfig.java @@ -0,0 +1,55 @@ +package com.example.onlyone.global.config.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.config.TopicConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaAdmin; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +@Configuration +@RequiredArgsConstructor +public class KafkaTopicConfig { + + private final KafkaProperties props; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map cfg = new HashMap<>(); + // 부트스트랩 서버 + var servers = props.getProducer().getCommonConfig().getBootstrapServers(); + cfg.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", servers)); + return new KafkaAdmin(cfg); + } + + // settlement.process.v1 + @Bean + public org.apache.kafka.clients.admin.NewTopic settlementProcessTopic() { + String topic = props.getProducer().getSettlementProcessProducerConfig().getTopic(); + return TopicBuilder.name(topic) + .partitions(12) + .replicas(1) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1") + .config(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) + .config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Duration.ofDays(7).toMillis())) + .build(); + } + + // user-settlement.result.v1 + @Bean + public org.apache.kafka.clients.admin.NewTopic userSettlementResultTopic() { + String topic = props.getConsumer().getUserSettlementLedgerConsumerConfig().getTopic(); + return TopicBuilder.name(topic) + .partitions(24) + .replicas(1) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1") + .config(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) + .config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Duration.ofDays(14).toMillis())) + .build(); + } +} \ No newline at end of file diff --git a/src/main/java/com/example/onlyone/global/config/kafka/OutboxRelayConfig.java b/src/main/java/com/example/onlyone/global/config/kafka/OutboxRelayConfig.java new file mode 100644 index 00000000..c288367e --- /dev/null +++ b/src/main/java/com/example/onlyone/global/config/kafka/OutboxRelayConfig.java @@ -0,0 +1,51 @@ +package com.example.onlyone.global.config.kafka; + + +import com.example.onlyone.domain.settlement.entity.OutboxStatus; +import com.example.onlyone.domain.settlement.repository.OutboxRepository; +import com.example.onlyone.global.exception.CustomException; +import com.example.onlyone.global.exception.ErrorCode; +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +@Component +@RequiredArgsConstructor +public class OutboxRelayConfig { + + private final OutboxRepository outboxRepository; + private final KafkaTemplate kafkaTemplate; + private final KafkaProperties props; + + @Scheduled(fixedDelay = 200) + @Transactional + public void publishBatch() { + var batch = outboxRepository.pickNewForUpdateSkipLocked(200); + if (batch.isEmpty()) return; + kafkaTemplate.executeInTransaction(kafkaOperation -> { + batch.forEach(e -> { + String topic = routeTopic(e.getEventType()); + kafkaOperation.send(topic, e.getKeyString(), e.getPayload()); + }); + return null; + }); + + batch.forEach(e -> { + e.setStatus(OutboxStatus.PUBLISHED); + e.setPublishedAt(LocalDateTime.now()); + }); + } + + private String routeTopic(String eventType) { + return switch (eventType) { + case "ParticipantSettlementResult" -> props.getConsumer() + .getUserSettlementLedgerConsumerConfig().getTopic(); + case "SettlementProcessEvent" -> props.getProducer() + .getSettlementProcessProducerConfig().getTopic(); + default -> throw new CustomException(ErrorCode.INVALID_TOPIC); + }; + } +} diff --git a/src/main/java/com/example/onlyone/global/exception/ErrorCode.java b/src/main/java/com/example/onlyone/global/exception/ErrorCode.java index 6d29bb82..812a5724 100644 --- a/src/main/java/com/example/onlyone/global/exception/ErrorCode.java +++ b/src/main/java/com/example/onlyone/global/exception/ErrorCode.java @@ -71,6 +71,7 @@ public enum ErrorCode { SETTLEMENT_NOT_FOUND(404, "SETTLEMENT_404_1", "정산을 찾을 수 없습니다."), USER_SETTLEMENT_NOT_FOUND(404, "SETTLEMENT_404_2", "정산 참여자를 찾을 수 없습니다."), ALREADY_SETTLED_USER(409, "SETTLEMENT_409_1", "이미 해당 정기 모임에 대해 정산한 유저입니다."), + ALREADY_COMPLETED_SETTLEMENT(409, "SETTLEMENT_409_2", "이미 종료된 정산입니다."), SETTLEMENT_PROCESS_FAILED(500, "SETTLEMENT_500_1", "정산 처리 중 오류가 발생했습니다. 다시 시도해 주세요."), // Wallet @@ -80,6 +81,7 @@ public enum ErrorCode { WALLET_HOLD_STATE_CONFLICT(409, "WALLET_409_2", "사용자의 예약금이 부족합니다. 포인트를 충전해 주세요."), WALLET_HOLD_CAPTURE_FAILED(409, "WALLET_409_3", "사용자의 예약금 차감에 실패했습니다. 다시 시도해 주세요."), WALLET_CREDIT_APPLY_FAILED(409, "WALLET_409_4", "리더의 정산금 처리에 실패했습니다. 다시 시도해 주세요."), + WALLET_OPERATION_IN_PROGRESS(409, "WALLET_409_5", "사용자의 다른 거래가 처리 중입니다. 잠시 후 다시 시도해 주세요."), // Payment PAYMENT_IN_PROGRESS(202, "PAYMENT_202_1", "결제 처리 중입니다. 잠시 후 다시 조회해 주세요."), @@ -144,7 +146,9 @@ public enum ErrorCode { // Database DATABASE_CONNECTION_ERROR(503, "DB_503_1", "데이터베이스 연결 중 오류가 발생했습니다."), - ; + // Outbox + INVALID_TOPIC(400, "OUTBOX_400_1", "유효하지 않은 토픽입니다."), + INVALID_EVENT_PAYLOAD(422, "OUTBOX_422_1", "잘못된 이벤트 페이로드입니다."); private final int status; private final String code; diff --git a/src/main/resources/luascript/wallet_gate_acquire.lua b/src/main/resources/luascript/wallet_gate_acquire.lua new file mode 100644 index 00000000..be1cd52c --- /dev/null +++ b/src/main/resources/luascript/wallet_gate_acquire.lua @@ -0,0 +1,3 @@ +-- KEYS[1] = gate key, ARGV[1] = ttlSec, ARGV[2] = owner +local ok = redis.call('set', KEYS[1], ARGV[2], 'EX', ARGV[1], 'NX') +if ok then return 1 else return 0 end diff --git a/src/main/resources/luascript/wallet_gate_release.lua b/src/main/resources/luascript/wallet_gate_release.lua new file mode 100644 index 00000000..1880ecbd --- /dev/null +++ b/src/main/resources/luascript/wallet_gate_release.lua @@ -0,0 +1,6 @@ +-- KEYS[1] = gate key, ARGV[1] = owner +if redis.call('get', KEYS[1]) == ARGV[1] then + return redis.call('del', KEYS[1]) +else + return 0 +end