Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.github.hyungkishin.transentia.common.outbox.transfer

data class ClaimedRow(
val eventId: Long,
val aggregateId: String,
val payload: String,
val headers: String,
val attemptCount: Int = 0
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ services:
command: >
bash -c "
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-complete-events --partitions 8 --replication-factor 1 &&
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-transaction-events --partitions 3 --replication-factor 1 &&
echo 'topics created'
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic transfer-transaction-events --partitions 8 --replication-factor 1 &&
echo 'Topics created: transfer-complete-events (8 partitions), transfer-transaction-events (8 partitions) - for 2000 TPS target'
"
restart: "no"

Expand Down
3 changes: 1 addition & 2 deletions docs/etc/송금도메인 이벤트 정리.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ FDS(이상거래탐지) 쪽에서 반드시 필요한 이벤트 이다.
- 이후 발행 여부는 published_at으로만 구분

## Outbox에 저장되는 필드
- `event_id` : Snowflake 기반 ID
- `event_id` : Snowflake 기반 ID ( Transaction ID )
- `aggregate_type` : "Transfer" (어떤 Aggregate의 이벤트인지)
- `aggregate_id` : Transaction ID
- `event_type` : "TransferRequested", "TransferCompleted", "TransferFailed"
- `payload` : 위 JSON 직렬화 결과
- `headers` : traceId, correlationId 등
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.springframework.kafka.config.KafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
import org.springframework.kafka.listener.ContainerProperties
import java.io.Serializable

@Configuration
Expand All @@ -22,21 +23,50 @@ class KafkaConsumerConfig<K : Serializable, V : SpecificRecordBase>(
@Bean
fun consumerConfigs(): Map<String, Any> {
return mutableMapOf<String, Any>().apply {
// 기본 설정
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.bootstrapServers)
put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerConfigData.consumerGroupId)
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.keyDeserializer)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.valueDeserializer)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerConfigData.autoOffsetReset)

// Avro 설정
put(kafkaConfigData.schemaRegistryUrlKey, kafkaConfigData.schemaRegistryUrl)
put(kafkaConsumerConfigData.specificAvroReaderKey, kafkaConsumerConfigData.specificAvroReader)

// Consumer Group 관리
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerConfigData.sessionTimeoutMs)
put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.heartbeatIntervalMs)
put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.maxPollIntervalMs)
put(
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
kafkaConsumerConfigData.maxPartitionFetchBytesDefault * kafkaConsumerConfigData.maxPartitionFetchBytesBoostFactor

// Fetch 설정
put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
kafkaConsumerConfigData.maxPartitionFetchBytesDefault *
kafkaConsumerConfigData.maxPartitionFetchBytesBoostFactor
)
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerConfigData.maxPollRecords)

// Fetch 최소 바이트: 1KB
// - 브로커가 최소 이 크기만큼 데이터가 쌓일 때까지 대기
// - 너무 작으면 네트워크 오버헤드, 너무 크면 지연 발생
put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024)

// Fetch 최대 대기 시간: 500ms
// - fetch.min.bytes에 도달하지 않아도 이 시간 후 응답
// - 실시간성과 처리량의 균형
put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500)

// 자동 커밋 비활성화 (수동 제어)
// - Spring Kafka의 AckMode로 제어
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)

// Isolation Level: read_committed
// - 트랜잭션 커밋된 메시지만 읽음
// - 데이터 정합성 보장
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")

// Client ID (모니터링용)
put(ConsumerConfig.CLIENT_ID_CONFIG, "fds-consumer-\${spring.application.name}")
}
}

Expand All @@ -45,14 +75,35 @@ class KafkaConsumerConfig<K : Serializable, V : SpecificRecordBase>(
return DefaultKafkaConsumerFactory(consumerConfigs())
}

/**
* 단일 이벤트 처리용 Kafka Listener Container Factory
*
* - Batch Listener: false (단일 이벤트)
* - Concurrency: 8 (파티션당 1 스레드)
* - AckMode: MANUAL_IMMEDIATE (수동 커밋, 즉시)
*/
@Bean
fun kafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>> {
val factory = ConcurrentKafkaListenerContainerFactory<K, V>()

factory.consumerFactory = consumerFactory()

// 단일 이벤트 처리
factory.isBatchListener = kafkaConsumerConfigData.batchListener

// Concurrency 설정 (파티션 수와 동일하게)
factory.setConcurrency(kafkaConsumerConfigData.concurrencyLevel)

// 자동 시작
factory.setAutoStartup(kafkaConsumerConfigData.autoStartup)
factory.containerProperties.pollTimeout = kafkaConsumerConfigData.pollTimeoutMs

// Container Properties 설정
factory.containerProperties.apply {
pollTimeout = kafkaConsumerConfigData.pollTimeoutMs
ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
}

return factory
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
},
"doc": "Type of transfer event"
},
{
"name": "aggregateId",
"type": "string",
"doc": "Transaction aggregate ID as string"
},
{
"name": "transactionId",
"type": "long",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class AnalyzeTransferService(
// TODO: 엣지케이스 -> 알림 + log 성 + 학습 + 관리자 !
// 과연 은행사마다 만들었을까 ? 이상감지를 탐지해주는 패턴이 있을것이다.

// NOTE : Hive 류의 빅데이터 플랫폼 <- 데이터의 근거
// 10년치 계좌의 모든 계좌 이력의 전체 -> 불특정 다수 -> 관계도를 -> queryBase 로 찾을 경우 ( 성능 up 비용이 높을때다. )
// LAG + LLM

// 모든 활성화된 룰 조회
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,66 @@ package io.github.hyungkishin.transentia.infra.event

import io.github.hyungkishin.transentia.application.service.AnalyzeTransferService
import io.github.hyungkishin.transentia.infrastructure.kafka.model.TransferEventAvroModel
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.messaging.handler.annotation.Headers
import org.springframework.kafka.support.Acknowledgment
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component

@Component
class TransferKafkaListener(
@Value("\${app.transfer.topic}") private val transferTopic: String,
private val analyzeTransferService: AnalyzeTransferService,
private val transferEventMapper: TransferEventMapper,
) {
private val log = LoggerFactory.getLogger(javaClass)

/**
* TODO: 이벤트를 처리하는 쪽의 성능
* TODO: 메세지 중복처리 할때의 문제 ( 현재 너무 risk ) -> 방어책
* TODO: offset update 여부 ( Big data tools 로 확인 )
*
* - 보내는 쪽과 받는쪽의 쓰루풋을 어떻게 조율 할 것인지
* - producer 몇대 , consumer 몇대 , 파티션 몇개
*/
@KafkaListener(
id = "\${kafka-consumer-config.consumer-group-id}",
topics = ["\${app.transfer.topic}"],
containerFactory = "kafkaListenerContainerFactory"
)
fun receive(
@Payload messages: List<TransferEventAvroModel>,
@Headers headers: Map<String, Any>
@Payload message: TransferEventAvroModel,
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
@Header(KafkaHeaders.OFFSET) offset: Long,
@Header(value = "eventType", required = false) eventType: String?,
@Header(value = "X-Trace-Id", required = false) traceId: String?,
consumerRecord: ConsumerRecord<String, TransferEventAvroModel>,
acknowledgment: Acknowledgment?
) {
val eventType = headers["eventType"]?.toString()
val traceId = headers["X-Trace-Id"]?.toString()
try {
log.debug(
"[FDS-Consumer] Received - partition={} offset={} eventId={} traceId={}",
partition, offset, message.eventId, traceId
)

log.info("@@@@@[FDS-Consumer] RECEIVED {} messages, traceId={}", messages.size, traceId)
// Domain Event 변환
val domainEvent = transferEventMapper.toDomain(message)

// TODO : offset 동작 확인
messages.forEach { avroMessage ->
try {
log.info(
"@@@@@[FDS-Consumer] Processing eventId={} amount={} status={}",
avroMessage.eventId, avroMessage.amount, avroMessage.status
)
// FDS 분석 실행
val riskLog = analyzeTransferService.analyze(domainEvent)

val domainEvent = transferEventMapper.toDomain(avroMessage)
log.info(
"[FDS-Consumer] Analysis complete - eventId={} decision={} hits={}",
domainEvent.eventId,
riskLog.decision,
riskLog.ruleHits.size,
)

val riskLog = analyzeTransferService.analyze(domainEvent)
// 수동 커밋 (MANUAL_IMMEDIATE 모드인 경우)
acknowledgment?.acknowledge()

log.info(
"[FDS-Consumer] Analysis complete - eventId={} decision={} hits={}",
domainEvent.eventId, riskLog.decision, riskLog.ruleHits.size
)
// TODO: Thread.sleep 을 걸었을때의 문제 발생 -> 여러 인스턴스 에서 책정하는것이 명확.
// TODO: Docker -> 인스턴스 3 대 -> log 확인

} catch (e: Exception) {
// TODO: 예외 발생시, 카프카 장애 대응 확인
// TODO: 카프카 쪽의 영향도 확인
log.error("[FDS-Consumer] Analysis failed - eventId={}", avroMessage.eventId, e)
// 재처리를 위해 예외 전파
throw e
}
} catch (e: Exception) {
log.error(
"[FDS-Consumer] Analysis failed - partition={} offset={} eventId={} error={}",
partition, offset, message.eventId, e.message, e
)
// 예외 발생시 재처리를 위해 전파
throw e
}
}

}
37 changes: 25 additions & 12 deletions services/fds/instances/api/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,25 @@ spring:
username: postgres
password: pass1234
driver-class-name: org.postgresql.Driver
# HikariCP 최적화
hikari:
maximum-pool-size: 20
minimum-idle: 10
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000

jpa:
open-in-view: false
hibernate:
ddl-auto: none
show-sql: true
show-sql: false
properties:
hibernate:
jdbc:
batch_size: 20
order_inserts: true
order_updates: true

flyway:
enabled: false
Expand All @@ -23,35 +36,35 @@ spring:

logging:
level:
org.hibernate.SQL: DEBUG
org.hibernate.type.descriptor.sql.BasicBinder: TRACE
org.hibernate.SQL: INFO
org.hibernate.type.descriptor.sql.BasicBinder: INFO
org.springframework.kafka: INFO
io.github.hyungkishin.transentia: DEBUG
io.github.hyungkishin.transentia: INFO

kafka-config:
bootstrap-servers: host.docker.internal:9094
schema-registry-url-key: schema.registry.url
schema-registry-url: http://localhost:8085
num-of-partitions: 8
num-of-partitions: 3
replication-factor: 1

kafka-consumer-config:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
consumer-group-id: fds-consumer-group
auto-offset-reset: earliest
specific-avro-reader-key: specific.avro.reader
specific-avro-reader: true
batch-listener: true

batch-listener: false
auto-startup: true
concurrency-level: 2 # 2000 TPS 는 4
concurrency-level: 3
max-poll-records: 100
max-partition-fetch-bytes-default: 1048576
max-partition-fetch-bytes-boost-factor: 1
poll-timeout-ms: 1000
session-timeout-ms: 10000
heartbeat-interval-ms: 3000
max-poll-interval-ms: 300000
max-poll-records: 500
max-partition-fetch-bytes-default: 1048576
max-partition-fetch-bytes-boost-factor: 1
poll-timeout-ms: 500

app:
transfer:
Expand Down
22 changes: 1 addition & 21 deletions services/transfer/application/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,30 +1,10 @@
//plugins {
// kotlin("jvm")
// kotlin("plugin.spring")
//}
//
//dependencies {
// implementation(project(":transfer-domain"))
// implementation(project(":common-application"))
// implementation(project(":common-domain"))
//
// implementation("org.springframework:spring-context")
// implementation("org.springframework:spring-tx")
//
// testImplementation("io.kotest:kotest-runner-junit5")
// testImplementation("io.kotest:kotest-assertions-core")
//}

plugins {
id("transentia.spring-library")
}

dependencies {
// 프로젝트 의존성
implementation(project(":transfer-domain"))
implementation(project(":common-application"))
implementation(project(":common-domain"))

// 특화된 의존성 (있다면 추가)
// 예: implementation("org.springframework.retry:spring-retry")
implementation(project(":kafka-model"))
}
Loading
Loading