diff --git a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java
index 5ef76a7e..ba7ab733 100644
--- a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java
+++ b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java
@@ -174,15 +174,19 @@ public void setNodeType(String nodeType) {
this.nodeType = nodeType;
}
+ protected String listPropertiesStr() {
+ return "nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId="
+ + clusterId + ", servicePort=" + servicePort + ", localtime="
+ + localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings="
+ + agentSettings + ", environment=" + environment;
+ }
+
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
- return "NodeInfo [nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId="
- + clusterId + ", servicePort=" + servicePort + ", localtime="
- + localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings="
- + agentSettings + ", environment=" + environment + "]";
+ return "NodeInfo [" + listPropertiesStr() + "]";
}
}
diff --git a/orion-server/pom.xml b/orion-server/pom.xml
index b9560bdc..28799334 100644
--- a/orion-server/pom.xml
+++ b/orion-server/pom.xml
@@ -102,6 +102,16 @@
+
+ com.clickhouse
+ clickhouse-http-client
+ 0.6.0
+
+
+ org.apache.httpcomponents.client5
+ httpclient5
+ 5.3
+
org.apache.curator
curator-framework
diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseClusterSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseClusterSensor.java
new file mode 100644
index 00000000..10d22f8e
--- /dev/null
+++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseClusterSensor.java
@@ -0,0 +1,125 @@
+package com.pinterest.orion.core.automation.sensor.clickhouse;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+
+import java.util.concurrent.ExecutionException;
+
+import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseCredentials;
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseProtocol;
+import com.clickhouse.client.ClickHouseResponse;
+import com.clickhouse.data.ClickHouseFormat;
+import com.clickhouse.data.ClickHouseRecord;
+
+import com.pinterest.orion.common.NodeInfo;
+import com.pinterest.orion.core.Node;
+import com.pinterest.orion.core.PluginConfigurationException;
+import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
+import com.pinterest.orion.core.clickhouse.ClickHouseNodeInfo;
+
+public class ClickHouseClusterSensor extends ClickHouseSensor {
+
+ public static final String CLUSTER_COL = "cluster";
+ public static final String SHARD_NUM_COL = "shard_num";
+ public static final String SHARD_WEIGHT_COL = "shard_weight";
+ public static final String REPLICA_NUM_COL = "replica_num";
+ public static final String HOST_NAME_COL = "host_name";
+
+ public static final String CLUSTERS_QUERY = "SELECT * FROM system.clusters WHERE is_local=1";
+
+ private Map config;
+
+ @Override
+ public String getName() {
+ return "clickhouseclustersensor";
+ }
+
+ @Override
+ public void initialize(Map config) throws PluginConfigurationException {
+ super.initialize(config);
+ }
+
+ private void addNodesFromServerSet(ClickHouseCluster cluster, String serversetPath) throws Exception {
+ List lines = Files.readAllLines(new File(serversetPath).toPath());
+ for (String serverStr : lines) {
+ ClickHouseNodeInfo nodeInfo = new ClickHouseNodeInfo();
+ nodeInfo.setNodeId(serverStr);
+ nodeInfo.setClusterId(cluster.getClusterId());
+
+ String[] splits = serverStr.split(":");
+ String ip = splits[0];
+ int port = Integer.parseInt(splits[1]);
+ nodeInfo.setIp(ip);
+ nodeInfo.setServicePort(port);
+
+ queryShardReplicaInfoFromNode(cluster, nodeInfo, ip, port);
+
+ Node node = cluster.getNodeMap().get(serverStr);
+ if (node == null) {
+ logger.info("Adding new node with info " + nodeInfo);
+ } else {
+ logger.info("Updating node; existing info " + node.getCurrentNodeInfo()
+ + ", new info " + nodeInfo);
+ }
+ cluster.addNodeWithoutAgent(nodeInfo);
+ }
+ }
+
+ @Override
+ public void sense(ClickHouseCluster cluster) throws Exception {
+ String serversetPath = cluster.getAttribute(cluster.SERVERSET_PATH).getValue();
+ addNodesFromServerSet(cluster, serversetPath);
+ }
+
+ private void queryShardReplicaInfo(
+ ClickHouseNode server, ClickHouseNodeInfo nodeInfo) throws ClickHouseException {
+ try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol());
+ // each node stores the clusters info in the table system.clusters
+ // here we query the shard/replica info each node stores for itself
+ // TODO: build an auditor action to make sure each node stores
+ // the consistent info about all the other nodes
+ ClickHouseResponse response = client.read(server)
+ .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+ .query(CLUSTERS_QUERY).execute().get()) {
+ for (ClickHouseRecord r : response.records()) {
+ // note that within each cluster, there can be smaller
+ // logical clusters, e.g. created using `Replicated`
+ String cluster = r.getValue(CLUSTER_COL).asString();
+ int shard = r.getValue(SHARD_NUM_COL).asInteger();
+ int shardWeight = r.getValue(SHARD_WEIGHT_COL).asInteger();
+ int replicaNum = r.getValue(REPLICA_NUM_COL).asInteger();
+ String hostName = r.getValue(HOST_NAME_COL).asString();
+
+ nodeInfo.setHostname(hostName);
+ nodeInfo.addShardReplicaInfo(cluster, shard, shardWeight, replicaNum);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw ClickHouseException.forCancellation(e, server);
+ } catch (ExecutionException e) {
+ throw ClickHouseException.of(e, server);
+ }
+ }
+
+ private void queryShardReplicaInfoFromNode(
+ ClickHouseCluster cluster, ClickHouseNodeInfo nodeInfo, String host, int port)
+ throws ClickHouseException {
+ String user = cluster.getAttribute(cluster.USER).getValue();
+ String password = cluster.getAttribute(cluster.PASSWORD).getValue();
+
+ ClickHouseNode server = ClickHouseNode.builder()
+ .host(host)
+ .port(ClickHouseProtocol.HTTP, port)
+ .database("system").credentials(
+ ClickHouseCredentials.fromUserAndPassword(user, password)
+ ).build();
+ queryShardReplicaInfo(server, nodeInfo);
+ }
+}
diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseSensor.java
new file mode 100644
index 00000000..be6985ca
--- /dev/null
+++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseSensor.java
@@ -0,0 +1,21 @@
+package com.pinterest.orion.core.automation.sensor.clickhouse;
+
+import com.pinterest.orion.core.Cluster;
+import com.pinterest.orion.core.automation.sensor.Sensor;
+import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
+
+public abstract class ClickHouseSensor extends Sensor {
+
+ @Override
+ public void observe(Cluster cluster) throws Exception {
+ if (logger == null) {
+ logger = getLogger(cluster);
+ }
+ if(cluster instanceof ClickHouseCluster){
+ sense((ClickHouseCluster) cluster);
+ }
+ }
+
+ public abstract void sense(ClickHouseCluster cluster) throws Exception;
+
+}
diff --git a/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseCluster.java b/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseCluster.java
index 9b1a1d39..d6b3dc8d 100644
--- a/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseCluster.java
+++ b/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseCluster.java
@@ -25,6 +25,15 @@ public class ClickHouseCluster extends Cluster {
private static final Logger logger = Logger.getLogger(ClickHouseNode.class.getName());
public static final String CLUSTER_REGION = "region";
public static final String DEFAULT_REGION = "us-east-1";
+
+ public static final String USER = "user";
+ private static final String DEFAULT_USER = "default";
+
+ public static final String PASSWORD = "password";
+ private static final String DEFAULT_PASSWORD = "";
+
+ public static final String SERVERSET_PATH = "serversetPath";
+
private Map config;
public ClickHouseCluster(String id,
@@ -44,6 +53,9 @@ public ClickHouseCluster(String id,
protected void bootstrapClusterInfo(Map config) throws PluginConfigurationException {
this.config = config;
setAttribute(CLUSTER_REGION, config.getOrDefault(CLUSTER_REGION, DEFAULT_REGION));
+ setAttribute(USER, config.getOrDefault(USER, DEFAULT_USER));
+ setAttribute(PASSWORD, config.getOrDefault(PASSWORD, DEFAULT_PASSWORD));
+ setAttribute(SERVERSET_PATH, config.get(SERVERSET_PATH));
}
@Override
diff --git a/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseNodeInfo.java b/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseNodeInfo.java
new file mode 100644
index 00000000..68c05a2d
--- /dev/null
+++ b/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseNodeInfo.java
@@ -0,0 +1,49 @@
+package com.pinterest.orion.core.clickhouse;
+
+import com.pinterest.orion.common.NodeInfo;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.HashMap;
+
+class ShardReplicaInfo {
+ public int shardNum;
+ public int shardWeight;
+ public int replicaNum;
+
+ ShardReplicaInfo(int shardNum, int shardWeight, int replicaNum) {
+ this.shardNum = shardNum;
+ this.shardWeight = shardWeight;
+ this.replicaNum = replicaNum;
+ }
+
+ public String toString() {
+ return "{shardNum: " + shardNum + ", shardWeight: "
+ + shardWeight + " replicaNum: " + replicaNum + "}";
+ }
+}
+
+public class ClickHouseNodeInfo extends NodeInfo implements Serializable {
+ Map infoByCluster = new HashMap<>();
+
+ public void addShardReplicaInfo(String cluster, int shardNum, int shardWeight, int replicaNum) {
+ infoByCluster.put(cluster, new ShardReplicaInfo(shardNum, shardWeight, replicaNum));
+ }
+
+ public int getShardNum(String cluster) {
+ return infoByCluster.get(cluster).shardNum;
+ }
+
+ public int getShardWeight(String cluster) {
+ return infoByCluster.get(cluster).shardWeight;
+ }
+
+ public int getReplicaNum(String cluster) {
+ return infoByCluster.get(cluster).replicaNum;
+ }
+
+ @Override
+ protected String listPropertiesStr() {
+ return super.listPropertiesStr() + ", infoByCluster=" + infoByCluster.toString();
+ }
+}
diff --git a/orion-server/src/main/java/com/pinterest/orion/server/ClusterTypeMap.java b/orion-server/src/main/java/com/pinterest/orion/server/ClusterTypeMap.java
index a43fad96..dc4f6247 100644
--- a/orion-server/src/main/java/com/pinterest/orion/server/ClusterTypeMap.java
+++ b/orion-server/src/main/java/com/pinterest/orion/server/ClusterTypeMap.java
@@ -24,6 +24,7 @@
import com.pinterest.orion.core.hbase.HBaseCluster;
import com.pinterest.orion.core.kafka.KafkaCluster;
import com.pinterest.orion.core.memq.MemqCluster;
+import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
public class ClusterTypeMap {
@@ -36,6 +37,7 @@ public class ClusterTypeMap {
clusterTypeMap.put("kafka", KafkaCluster.class);
clusterTypeMap.put("memq", MemqCluster.class);
clusterTypeMap.put("hbase", HBaseCluster.class);
+ clusterTypeMap.put("clickhouse", ClickHouseCluster.class);
try {
Reflections reflections = new Reflections("com.pinterest.orion.core");
diff --git a/orion-server/src/test/resources/configs/clickhouse-server.yaml b/orion-server/src/test/resources/configs/clickhouse-server.yaml
new file mode 100644
index 00000000..ca037251
--- /dev/null
+++ b/orion-server/src/test/resources/configs/clickhouse-server.yaml
@@ -0,0 +1,37 @@
+logging:
+ level: INFO
+ appenders:
+ - type: file
+ # The file to which current statements will be logged.
+ currentLogFilename: /var/log/orion/orion.log
+ # When the log file rotates, the archived log will be renamed to this and gzipped. The
+ # %d is replaced with the previous day (yyyy-MM-dd). Custom rolling windows can be created
+ # by passing a SimpleDateFormat-compatible format as an argument: "%d{yyyy-MM-dd-hh}".
+ archivedLogFilenamePattern: /var/log/orion/orion-%d.log.gz
+ # The number of archived files to keep.
+ archivedFileCount: 5
+
+server:
+ requestLog:
+ appenders: []
+ applicationConnectors:
+ - type: http
+ port: 8090
+ bindHost: 127.0.0.1
+ adminConnectors:
+ - type: http
+ port: 8444
+ bindHost: 0.0.0.0
+
+clusterConfigs:
+ - clusterId: testclickhouse
+ type: clickhouse
+ configuration:
+ serversetPath: /opt/orion-server/discovery.testclickhouse.test
+
+plugins:
+ sensorConfigs:
+ - key: clusterSensor
+ class: com.pinterest.orion.core.automation.sensor.clickhouse.ClickHouseClusterSensor
+ interval: 60
+ enabled: true
\ No newline at end of file