11package org .scoula .domain .kafka .listener ;
22
3+ import com .fasterxml .jackson .databind .ObjectMapper ;
4+ import org .scoula .domain .cloudwatch .dto .LogMessage ;
35import org .scoula .domain .cloudwatch .service .LogConsumerService ;
46import org .scoula .domain .kafka .producer .KafkaProducer ;
57import org .springframework .kafka .annotation .KafkaListener ;
@@ -16,11 +18,14 @@ public class KafkaLogListener {
1618
1719 private final LogConsumerService logConsumerService ;
1820 private final KafkaProducer kafkaProducer ;
21+ private final ObjectMapper objectMapper = new ObjectMapper ();
22+
1923
2024 @ KafkaListener (topics = "log-module" , groupId = "log-consumer-group" )
2125 public void listen (String message , Acknowledgment ack ) {
2226 try {
23- logConsumerService .loggingIntegration (message );
27+ LogMessage logMessage = objectMapper .readValue (message , LogMessage .class );
28+ logConsumerService .loggingIntegration (logMessage );
2429 ack .acknowledge ();
2530 }catch (Exception e ) {
2631 log .error (e .getMessage ());
@@ -32,7 +37,9 @@ public void listen(String message, Acknowledgment ack) {
3237 @ KafkaListener (topics = "log-retry" , groupId = "log-retry-group" )
3338 public void listenRetry (String message , Acknowledgment ack ) {
3439 try {
35- logConsumerService .loggingIntegration (message );
40+ LogMessage logMessage = objectMapper .readValue (message , LogMessage .class );
41+ //todo retry count > 10 have to go DLQ
42+ logConsumerService .loggingIntegration (logMessage );
3643 ack .acknowledge ();
3744 }catch (Exception e ) {
3845 log .error (e .getMessage ());
0 commit comments