Skip to content

Commit

Permalink
CHanges for monitoring cesc data
Browse files Browse the repository at this point in the history
  • Loading branch information
HariharanAnantharaman committed Jun 27, 2018
1 parent a6f7752 commit 48163d1
Show file tree
Hide file tree
Showing 12 changed files with 628 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,8 @@ else if("roomambience".equalsIgnoreCase(metrics)){
}
else if("batteryhealth".equalsIgnoreCase(metrics)){
returnVal="battery_health";
}else if ("transformer".equalsIgnoreCase(metrics)){
returnVal="transformer_telemetry";
}
return returnVal;
}
Expand Down
44 changes: 44 additions & 0 deletions connectionstatistics/src/main/resources/application-cesc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
server:
port: 8100



logging:
level:
org.springframework.security: DEBUG
log4j.logger.org.hibernate.SQL: DEBUG
com.techolution: DEBUG

spring:
jpa:
show-sql: true
hibernate:
ddl-auto: update
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL5Dialect
datasource:
url: jdbc:mysql://localhost:3306/mauritius_smartwater
username: admin
password: admin123
driver-class-name: com.mysql.jdbc.Driver
tomcat:
max-active: 5
hikari:
maximum-pool-size: 5
application:
name: connectionstatisticsservice




influx:
url: http://52.170.92.62:8086
username: root
password: root
dbname: cesc_qa
datatimezone: Asia/Calcutta
retentionpolicy: aRetentionPolicy2



100 changes: 100 additions & 0 deletions mqttKafkaBridge/src/main/java/com/m2mci/mqttKafkaBridge/Bridge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.m2mci.mqttKafkaBridge;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.producer.ProducerConfig;

import org.apache.log4j.Logger;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.kohsuke.args4j.CmdLineException;

public class Bridge implements MqttCallback {
private Logger logger = Logger.getLogger(this.getClass().getName());
private MqttAsyncClient mqtt;
private Producer<String, Message> kafkaProducer;

private void connect(String serverURI, String clientId, String zkConnect) throws MqttException {
mqtt = new MqttAsyncClient(serverURI, clientId);
mqtt.setCallback(this);
IMqttToken token = mqtt.connect();
Properties props = new Properties();
props.put("zk.connect", zkConnect);
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
ProducerConfig config = new ProducerConfig(props);
kafkaProducer = new Producer<String, Message>(config);
token.waitForCompletion();
logger.info("Connected to MQTT and Kafka");
}

private void reconnect() throws MqttException {
IMqttToken token = mqtt.connect();
token.waitForCompletion();
}

private void subscribe(String[] mqttTopicFilters) throws MqttException {
int[] qos = new int[mqttTopicFilters.length];
for (int i = 0; i < qos.length; ++i) {
qos[i] = 0;
}
mqtt.subscribe(mqttTopicFilters, qos);
}

@Override
public void connectionLost(Throwable cause) {
logger.warn("Lost connection to MQTT server", cause);
while (true) {
try {
logger.info("Attempting to reconnect to MQTT server");
reconnect();
logger.info("Reconnected to MQTT server, resuming");
return;
} catch (MqttException e) {
logger.warn("Reconnect failed, retrying in 10 seconds", e);
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub

}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
ProducerData<String, Message> data = new ProducerData<String, Message>(topic, new Message(payload));
kafkaProducer.send(data);
}

/**
* @param args
*/
public static void main(String[] args) {
CommandLineParser parser = null;
try {
parser = new CommandLineParser();
parser.parse(args);
Bridge bridge = new Bridge();
bridge.connect(parser.getServerURI(), parser.getClientId(), parser.getZkConnect());
bridge.subscribe(parser.getMqttTopicFilters());
} catch (MqttException e) {
e.printStackTrace(System.err);
} catch (CmdLineException e) {
System.err.println(e.getMessage());
parser.printUsage(System.err);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.m2mci.mqttKafkaBridge;

import java.io.OutputStream;
import java.io.PrintStream;

import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;

public class CommandLineParser {
private static final String ALL_MQTT_TOPICS = "#";
private static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
private static final String DEFAULT_MQTT_SERVER_URI = "tcp://localhost:1883";

@Option(name="--id", usage="MQTT Client ID")
private String clientId = "mqttKafkaBridge";

@Option(name="--uri", usage="MQTT Server URI")
private String serverURI = DEFAULT_MQTT_SERVER_URI;

@Option(name="--zk", usage="Zookeeper connect string")
private String zkConnect = DEFAULT_ZOOKEEPER_CONNECT;

@Option(name="--topics", usage="MQTT topic filters (comma-separated)")
private String mqttTopicFilters = ALL_MQTT_TOPICS;

@Option(name="--help", aliases="-h", usage="Show help")
private boolean showHelp = false;

private CmdLineParser parser = new CmdLineParser(this);

public String getClientId() {
return clientId;
}

public String getServerURI() {
return serverURI;
}

public String getZkConnect() {
return zkConnect;
}

public String[] getMqttTopicFilters() {
return mqttTopicFilters.split(",");
}

public void parse(String[] args) throws CmdLineException {
parser.parseArgument(args);
if (showHelp) {
printUsage(System.out);
System.exit(0);
}
}

public void printUsage(OutputStream out) {
PrintStream stream = new PrintStream(out);
stream.println("java " + Bridge.class.getName() + " [options...]");
parser.printUsage(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.springframework.core.env.Environment;

import com.techolution.smartoffice.adapter.callback.BMSMqttCallBack;
import com.techolution.smartoffice.adapter.callback.DefaultMqttCallBack;
import com.techolution.smartoffice.adapter.callback.SmartOfficeMqttCallBack;


Expand All @@ -28,6 +29,9 @@ public class MqttkafkaadapterApplication implements CommandLineRunner{
@Autowired
BMSMqttCallBack bmsMqttCallBack;

@Autowired
DefaultMqttCallBack cescMqttCallBack;

@Autowired
private Environment environment;

Expand All @@ -45,7 +49,11 @@ public void run(String... args) throws Exception {
if((environment.getActiveProfiles())[0].contains("bms")){
bmsMqttCallBack.connect();
logger.debug("Registed BMS callback");
}else{
} else if((environment.getActiveProfiles())[0].contains("cesc")){
cescMqttCallBack.connect();
logger.debug("Registed cescMqttCallBack callback");
}
else{
smartOfficeMqttCallBack.connect();
logger.debug("Registed smartoffice callback");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.techolution.smartoffice.adapter.callback;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;


import com.techolution.smartoffice.adapter.CustomProperties;




public abstract class AbstractMqttCallBack implements MqttCallback {

private Log log = LogFactory.getLog(AbstractMqttCallBack.class);

@Autowired
CustomProperties customProperties;

protected MqttAsyncClient mqtt;


@Autowired
protected KafkaTemplate<String, String> template;


@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
try {
connect();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub

}

public void subscribe() throws MqttException {
//int[] qos = new int[mqttTopicFilters.length];
/*for (int i = 0; i < qos.length; ++i) {
qos[i] = 0;
}*/
if(mqtt == null)
{
this.connect();
}
if(!mqtt.isConnected()){
mqtt.connect();
}
mqtt.subscribe(customProperties.getMqtttopic(), 0);
}


public void connect() throws MqttException {
log.debug("Broker is"+customProperties.getMqttbroker());
if(customProperties!=null){
mqtt = new MqttAsyncClient(customProperties.getMqttbroker(),customProperties.getMqttclientid());
mqtt.setCallback(this);
IMqttToken token=mqtt.connect();
//log.debug("Connection to "+customProperties.getMqttbroker()+" completed successfully");
mqtt.setCallback(this);
while(!token.isComplete()){
// log.debug("Waiting for connection to complete");
}
mqtt.subscribe(customProperties.getMqtttopic(), 0);

//token.
// log.info("MQTT CLIENT HAS BEEN CONNECTED");
}else{
log.warn("Properties is null");
}



}

/*public SmartOfficeMqttCallBack() {
super();
try {
this.connect();
this.subscribe();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}*/

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.techolution.smartoffice.adapter.callback;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;

@Component
public class DefaultMqttCallBack extends AbstractMqttCallBack {

private Log log = LogFactory.getLog(DefaultMqttCallBack.class);
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO Auto-generated method stub



byte[] payload = message.getPayload();
String messageReceived=new String(payload);
log.debug("Before sending message:"+messageReceived);
//log.debug("Before sending message:"+object.toString());
this.template.send(this.customProperties.getKafkatopic(),messageReceived);

}

}
Loading

0 comments on commit 48163d1

Please sign in to comment.