diff --git a/src/main/java/com/networknt/mesh/kafka/GenericLightStreams.java b/src/main/java/com/networknt/mesh/kafka/GenericLightStreams.java new file mode 100644 index 0000000..38419d4 --- /dev/null +++ b/src/main/java/com/networknt/mesh/kafka/GenericLightStreams.java @@ -0,0 +1,59 @@ +package com.networknt.mesh.kafka; +import com.networknt.config.Config; +import com.networknt.kafka.common.KafkaStreamsConfig; +import com.networknt.kafka.common.Constants; +import com.networknt.kafka.streams.LightStreams; +import com.networknt.service.SingletonServiceFactory; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class GenericLightStreams implements LightStreams { + + private static final Logger logger= LoggerFactory.getLogger(GenericLightStreams.class); + + static final KafkaStreamsConfig config = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfig(KafkaStreamsConfig.CONFIG_NAME, KafkaStreamsConfig.class); + private KafkaStreams kafkaStreams; + + @Override + public void start(String ip, int port) { + + Properties streamProps=new Properties(); + streamProps.putAll(config.getProperties()); + streamProps.put(StreamsConfig.APPLICATION_SERVER_CONFIG, ip +":"+port); + streamProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); + streamProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); + GenericStreamsTopology topology = SingletonServiceFactory.getBean(GenericStreamsTopology.class); + try { + kafkaStreams = new KafkaStreams(topology.buildTopology(), streamProps); + kafkaStreams.setUncaughtExceptionHandler(eh ->{ + logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", eh); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; + }); + if(config.isCleanUp()) { + kafkaStreams.cleanUp(); + } + kafkaStreams = startStream(ip, port, topology.buildTopology(), config, topology.getDlqTopicMetadataMap(), Constants.GENERIC_TRANSFORMER); + + }catch (Exception e){ + logger.error(e.getMessage()); + kafkaStreams = null; + } + } + + @Override + public void close() { + if(kafkaStreams !=null) + kafkaStreams.close(); + + } + + public KafkaStreams getKafkaStreams() { + return kafkaStreams; + } +} diff --git a/src/main/java/com/networknt/mesh/kafka/GenericStreamsTopology.java b/src/main/java/com/networknt/mesh/kafka/GenericStreamsTopology.java new file mode 100644 index 0000000..e13373f --- /dev/null +++ b/src/main/java/com/networknt/mesh/kafka/GenericStreamsTopology.java @@ -0,0 +1,11 @@ +package com.networknt.mesh.kafka; + +import com.networknt.kafka.entity.StreamsDLQMetadata; +import org.apache.kafka.streams.Topology; + +import java.util.Map; + +public interface GenericStreamsTopology { + Topology buildTopology(); + Map getDlqTopicMetadataMap(); +} diff --git a/src/main/java/com/networknt/mesh/kafka/KafkaStreamsShutdownHook.java b/src/main/java/com/networknt/mesh/kafka/KafkaStreamsShutdownHook.java new file mode 100644 index 0000000..506eed6 --- /dev/null +++ b/src/main/java/com/networknt/mesh/kafka/KafkaStreamsShutdownHook.java @@ -0,0 +1,19 @@ +package com.networknt.mesh.kafka; + +import com.networknt.server.ShutdownHookProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaStreamsShutdownHook implements ShutdownHookProvider { + private static final Logger logger= LoggerFactory.getLogger(KafkaStreamsShutdownHook.class); + + @Override + public void onShutdown() { + logger.info("KafkaStreamsShutdownHook Shutdown Begins !!!"); + + if(null != KafkaStreamsStartupHook.genericLightStreams){ + KafkaStreamsStartupHook.genericLightStreams.close(); + } + logger.info("KafkaStreamsShutdownHook Ends !!! "); + } +} diff --git a/src/main/java/com/networknt/mesh/kafka/KafkaStreamsStartupHook.java b/src/main/java/com/networknt/mesh/kafka/KafkaStreamsStartupHook.java new file mode 100644 index 0000000..ea3827d --- /dev/null +++ b/src/main/java/com/networknt/mesh/kafka/KafkaStreamsStartupHook.java @@ -0,0 +1,25 @@ +package com.networknt.mesh.kafka; + +import com.networknt.server.Server; +import com.networknt.server.StartupHookProvider; +import com.networknt.utility.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaStreamsStartupHook implements StartupHookProvider { + private static final Logger logger= LoggerFactory.getLogger(KafkaStreamsStartupHook.class); + public static GenericLightStreams genericLightStreams = new GenericLightStreams(); + + @Override + public void onStartup() { + logger.info("KafkaStreamsStartupHook Starting !!! "); + + int port = Server.getServerConfig().getHttpsPort(); + String ip = NetUtils.getLocalAddressByDatagram(); + logger.info("ip = {} port = {}",ip, port); + // start the kafka stream process + genericLightStreams.start(ip, port); + logger.info("KafkaStreamsStartupHook onStartup ends."); + } + +}