Skip to content

fixes #146 add generic streams, topology, startup hook and shutdown hook #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/main/java/com/networknt/mesh/kafka/GenericLightStreams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.networknt.mesh.kafka;
import com.networknt.config.Config;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.common.Constants;
import com.networknt.kafka.streams.LightStreams;
import com.networknt.service.SingletonServiceFactory;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class GenericLightStreams implements LightStreams {

private static final Logger logger= LoggerFactory.getLogger(GenericLightStreams.class);

static final KafkaStreamsConfig config = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfig(KafkaStreamsConfig.CONFIG_NAME, KafkaStreamsConfig.class);
private KafkaStreams kafkaStreams;

@Override
public void start(String ip, int port) {

Properties streamProps=new Properties();
streamProps.putAll(config.getProperties());
streamProps.put(StreamsConfig.APPLICATION_SERVER_CONFIG, ip +":"+port);
streamProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
streamProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
GenericStreamsTopology topology = SingletonServiceFactory.getBean(GenericStreamsTopology.class);
try {
kafkaStreams = new KafkaStreams(topology.buildTopology(), streamProps);
kafkaStreams.setUncaughtExceptionHandler(eh ->{
logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", eh);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
if(config.isCleanUp()) {
kafkaStreams.cleanUp();
}
kafkaStreams = startStream(ip, port, topology.buildTopology(), config, topology.getDlqTopicMetadataMap(), Constants.GENERIC_TRANSFORMER);

}catch (Exception e){
logger.error(e.getMessage());
kafkaStreams = null;
}
}

@Override
public void close() {
if(kafkaStreams !=null)
kafkaStreams.close();

}

public KafkaStreams getKafkaStreams() {
return kafkaStreams;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/networknt/mesh/kafka/GenericStreamsTopology.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.networknt.mesh.kafka;

import com.networknt.kafka.entity.StreamsDLQMetadata;
import org.apache.kafka.streams.Topology;

import java.util.Map;

public interface GenericStreamsTopology {
Topology buildTopology();
Map<String, StreamsDLQMetadata> getDlqTopicMetadataMap();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.networknt.mesh.kafka;

import com.networknt.server.ShutdownHookProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamsShutdownHook implements ShutdownHookProvider {
private static final Logger logger= LoggerFactory.getLogger(KafkaStreamsShutdownHook.class);

@Override
public void onShutdown() {
logger.info("KafkaStreamsShutdownHook Shutdown Begins !!!");

if(null != KafkaStreamsStartupHook.genericLightStreams){
KafkaStreamsStartupHook.genericLightStreams.close();
}
logger.info("KafkaStreamsShutdownHook Ends !!! ");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.networknt.mesh.kafka;

import com.networknt.server.Server;
import com.networknt.server.StartupHookProvider;
import com.networknt.utility.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamsStartupHook implements StartupHookProvider {
private static final Logger logger= LoggerFactory.getLogger(KafkaStreamsStartupHook.class);
public static GenericLightStreams genericLightStreams = new GenericLightStreams();

@Override
public void onStartup() {
logger.info("KafkaStreamsStartupHook Starting !!! ");

int port = Server.getServerConfig().getHttpsPort();
String ip = NetUtils.getLocalAddressByDatagram();
logger.info("ip = {} port = {}",ip, port);
// start the kafka stream process
genericLightStreams.start(ip, port);
logger.info("KafkaStreamsStartupHook onStartup ends.");
}

}