From 33f422ba6a959cd0751cdf4aeef52da9e3492d65 Mon Sep 17 00:00:00 2001 From: Charles Wu Date: Thu, 27 Jun 2024 22:38:38 +0000 Subject: [PATCH] Add Orion sensor for Clickhouse clusters/nodes --- .../com/pinterest/orion/common/NodeInfo.java | 12 +- orion-server/pom.xml | 10 ++ .../clickhouse/ClickHouseClusterSensor.java | 125 ++++++++++++++++++ .../sensor/clickhouse/ClickHouseSensor.java | 21 +++ .../core/clickhouse/ClickHouseCluster.java | 12 ++ .../core/clickhouse/ClickHouseNodeInfo.java | 49 +++++++ .../orion/server/ClusterTypeMap.java | 2 + .../resources/configs/clickhouse-server.yaml | 37 ++++++ 8 files changed, 264 insertions(+), 4 deletions(-) create mode 100644 orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseClusterSensor.java create mode 100644 orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseSensor.java create mode 100644 orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseNodeInfo.java create mode 100644 orion-server/src/test/resources/configs/clickhouse-server.yaml diff --git a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java index 5ef76a7e..ba7ab733 100644 --- a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java +++ b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java @@ -174,15 +174,19 @@ public void setNodeType(String nodeType) { this.nodeType = nodeType; } + protected String listPropertiesStr() { + return "nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId=" + + clusterId + ", servicePort=" + servicePort + ", localtime=" + + localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings=" + + agentSettings + ", environment=" + environment; + } + /* (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { - return "NodeInfo [nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId=" - + clusterId + ", servicePort=" + servicePort + ", localtime=" - + localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings=" - + agentSettings + ", environment=" + environment + "]"; + return "NodeInfo [" + listPropertiesStr() + "]"; } } diff --git a/orion-server/pom.xml b/orion-server/pom.xml index b9560bdc..28799334 100644 --- a/orion-server/pom.xml +++ b/orion-server/pom.xml @@ -102,6 +102,16 @@ + + com.clickhouse + clickhouse-http-client + 0.6.0 + + + org.apache.httpcomponents.client5 + httpclient5 + 5.3 + org.apache.curator curator-framework diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseClusterSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseClusterSensor.java new file mode 100644 index 00000000..10d22f8e --- /dev/null +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseClusterSensor.java @@ -0,0 +1,125 @@ +package com.pinterest.orion.core.automation.sensor.clickhouse; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; + +import java.util.concurrent.ExecutionException; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.data.ClickHouseRecord; + +import com.pinterest.orion.common.NodeInfo; +import com.pinterest.orion.core.Node; +import com.pinterest.orion.core.PluginConfigurationException; +import com.pinterest.orion.core.clickhouse.ClickHouseCluster; +import com.pinterest.orion.core.clickhouse.ClickHouseNodeInfo; + +public class ClickHouseClusterSensor extends ClickHouseSensor { + + public static final String CLUSTER_COL = "cluster"; + public static final String SHARD_NUM_COL = "shard_num"; + public static final String SHARD_WEIGHT_COL = "shard_weight"; + public static final String REPLICA_NUM_COL = "replica_num"; + public static final String HOST_NAME_COL = "host_name"; + + public static final String CLUSTERS_QUERY = "SELECT * FROM system.clusters WHERE is_local=1"; + + private Map config; + + @Override + public String getName() { + return "clickhouseclustersensor"; + } + + @Override + public void initialize(Map config) throws PluginConfigurationException { + super.initialize(config); + } + + private void addNodesFromServerSet(ClickHouseCluster cluster, String serversetPath) throws Exception { + List lines = Files.readAllLines(new File(serversetPath).toPath()); + for (String serverStr : lines) { + ClickHouseNodeInfo nodeInfo = new ClickHouseNodeInfo(); + nodeInfo.setNodeId(serverStr); + nodeInfo.setClusterId(cluster.getClusterId()); + + String[] splits = serverStr.split(":"); + String ip = splits[0]; + int port = Integer.parseInt(splits[1]); + nodeInfo.setIp(ip); + nodeInfo.setServicePort(port); + + queryShardReplicaInfoFromNode(cluster, nodeInfo, ip, port); + + Node node = cluster.getNodeMap().get(serverStr); + if (node == null) { + logger.info("Adding new node with info " + nodeInfo); + } else { + logger.info("Updating node; existing info " + node.getCurrentNodeInfo() + + ", new info " + nodeInfo); + } + cluster.addNodeWithoutAgent(nodeInfo); + } + } + + @Override + public void sense(ClickHouseCluster cluster) throws Exception { + String serversetPath = cluster.getAttribute(cluster.SERVERSET_PATH).getValue(); + addNodesFromServerSet(cluster, serversetPath); + } + + private void queryShardReplicaInfo( + ClickHouseNode server, ClickHouseNodeInfo nodeInfo) throws ClickHouseException { + try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol()); + // each node stores the clusters info in the table system.clusters + // here we query the shard/replica info each node stores for itself + // TODO: build an auditor action to make sure each node stores + // the consistent info about all the other nodes + ClickHouseResponse response = client.read(server) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query(CLUSTERS_QUERY).execute().get()) { + for (ClickHouseRecord r : response.records()) { + // note that within each cluster, there can be smaller + // logical clusters, e.g. created using `Replicated` + String cluster = r.getValue(CLUSTER_COL).asString(); + int shard = r.getValue(SHARD_NUM_COL).asInteger(); + int shardWeight = r.getValue(SHARD_WEIGHT_COL).asInteger(); + int replicaNum = r.getValue(REPLICA_NUM_COL).asInteger(); + String hostName = r.getValue(HOST_NAME_COL).asString(); + + nodeInfo.setHostname(hostName); + nodeInfo.addShardReplicaInfo(cluster, shard, shardWeight, replicaNum); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ClickHouseException.forCancellation(e, server); + } catch (ExecutionException e) { + throw ClickHouseException.of(e, server); + } + } + + private void queryShardReplicaInfoFromNode( + ClickHouseCluster cluster, ClickHouseNodeInfo nodeInfo, String host, int port) + throws ClickHouseException { + String user = cluster.getAttribute(cluster.USER).getValue(); + String password = cluster.getAttribute(cluster.PASSWORD).getValue(); + + ClickHouseNode server = ClickHouseNode.builder() + .host(host) + .port(ClickHouseProtocol.HTTP, port) + .database("system").credentials( + ClickHouseCredentials.fromUserAndPassword(user, password) + ).build(); + queryShardReplicaInfo(server, nodeInfo); + } +} diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseSensor.java new file mode 100644 index 00000000..be6985ca --- /dev/null +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/clickhouse/ClickHouseSensor.java @@ -0,0 +1,21 @@ +package com.pinterest.orion.core.automation.sensor.clickhouse; + +import com.pinterest.orion.core.Cluster; +import com.pinterest.orion.core.automation.sensor.Sensor; +import com.pinterest.orion.core.clickhouse.ClickHouseCluster; + +public abstract class ClickHouseSensor extends Sensor { + + @Override + public void observe(Cluster cluster) throws Exception { + if (logger == null) { + logger = getLogger(cluster); + } + if(cluster instanceof ClickHouseCluster){ + sense((ClickHouseCluster) cluster); + } + } + + public abstract void sense(ClickHouseCluster cluster) throws Exception; + +} diff --git a/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseCluster.java b/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseCluster.java index 9b1a1d39..d6b3dc8d 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseCluster.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseCluster.java @@ -25,6 +25,15 @@ public class ClickHouseCluster extends Cluster { private static final Logger logger = Logger.getLogger(ClickHouseNode.class.getName()); public static final String CLUSTER_REGION = "region"; public static final String DEFAULT_REGION = "us-east-1"; + + public static final String USER = "user"; + private static final String DEFAULT_USER = "default"; + + public static final String PASSWORD = "password"; + private static final String DEFAULT_PASSWORD = ""; + + public static final String SERVERSET_PATH = "serversetPath"; + private Map config; public ClickHouseCluster(String id, @@ -44,6 +53,9 @@ public ClickHouseCluster(String id, protected void bootstrapClusterInfo(Map config) throws PluginConfigurationException { this.config = config; setAttribute(CLUSTER_REGION, config.getOrDefault(CLUSTER_REGION, DEFAULT_REGION)); + setAttribute(USER, config.getOrDefault(USER, DEFAULT_USER)); + setAttribute(PASSWORD, config.getOrDefault(PASSWORD, DEFAULT_PASSWORD)); + setAttribute(SERVERSET_PATH, config.get(SERVERSET_PATH)); } @Override diff --git a/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseNodeInfo.java b/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseNodeInfo.java new file mode 100644 index 00000000..68c05a2d --- /dev/null +++ b/orion-server/src/main/java/com/pinterest/orion/core/clickhouse/ClickHouseNodeInfo.java @@ -0,0 +1,49 @@ +package com.pinterest.orion.core.clickhouse; + +import com.pinterest.orion.common.NodeInfo; + +import java.io.Serializable; +import java.util.Map; +import java.util.HashMap; + +class ShardReplicaInfo { + public int shardNum; + public int shardWeight; + public int replicaNum; + + ShardReplicaInfo(int shardNum, int shardWeight, int replicaNum) { + this.shardNum = shardNum; + this.shardWeight = shardWeight; + this.replicaNum = replicaNum; + } + + public String toString() { + return "{shardNum: " + shardNum + ", shardWeight: " + + shardWeight + " replicaNum: " + replicaNum + "}"; + } +} + +public class ClickHouseNodeInfo extends NodeInfo implements Serializable { + Map infoByCluster = new HashMap<>(); + + public void addShardReplicaInfo(String cluster, int shardNum, int shardWeight, int replicaNum) { + infoByCluster.put(cluster, new ShardReplicaInfo(shardNum, shardWeight, replicaNum)); + } + + public int getShardNum(String cluster) { + return infoByCluster.get(cluster).shardNum; + } + + public int getShardWeight(String cluster) { + return infoByCluster.get(cluster).shardWeight; + } + + public int getReplicaNum(String cluster) { + return infoByCluster.get(cluster).replicaNum; + } + + @Override + protected String listPropertiesStr() { + return super.listPropertiesStr() + ", infoByCluster=" + infoByCluster.toString(); + } +} diff --git a/orion-server/src/main/java/com/pinterest/orion/server/ClusterTypeMap.java b/orion-server/src/main/java/com/pinterest/orion/server/ClusterTypeMap.java index a43fad96..dc4f6247 100644 --- a/orion-server/src/main/java/com/pinterest/orion/server/ClusterTypeMap.java +++ b/orion-server/src/main/java/com/pinterest/orion/server/ClusterTypeMap.java @@ -24,6 +24,7 @@ import com.pinterest.orion.core.hbase.HBaseCluster; import com.pinterest.orion.core.kafka.KafkaCluster; import com.pinterest.orion.core.memq.MemqCluster; +import com.pinterest.orion.core.clickhouse.ClickHouseCluster; public class ClusterTypeMap { @@ -36,6 +37,7 @@ public class ClusterTypeMap { clusterTypeMap.put("kafka", KafkaCluster.class); clusterTypeMap.put("memq", MemqCluster.class); clusterTypeMap.put("hbase", HBaseCluster.class); + clusterTypeMap.put("clickhouse", ClickHouseCluster.class); try { Reflections reflections = new Reflections("com.pinterest.orion.core"); diff --git a/orion-server/src/test/resources/configs/clickhouse-server.yaml b/orion-server/src/test/resources/configs/clickhouse-server.yaml new file mode 100644 index 00000000..ca037251 --- /dev/null +++ b/orion-server/src/test/resources/configs/clickhouse-server.yaml @@ -0,0 +1,37 @@ +logging: + level: INFO + appenders: + - type: file + # The file to which current statements will be logged. + currentLogFilename: /var/log/orion/orion.log + # When the log file rotates, the archived log will be renamed to this and gzipped. The + # %d is replaced with the previous day (yyyy-MM-dd). Custom rolling windows can be created + # by passing a SimpleDateFormat-compatible format as an argument: "%d{yyyy-MM-dd-hh}". + archivedLogFilenamePattern: /var/log/orion/orion-%d.log.gz + # The number of archived files to keep. + archivedFileCount: 5 + +server: + requestLog: + appenders: [] + applicationConnectors: + - type: http + port: 8090 + bindHost: 127.0.0.1 + adminConnectors: + - type: http + port: 8444 + bindHost: 0.0.0.0 + +clusterConfigs: + - clusterId: testclickhouse + type: clickhouse + configuration: + serversetPath: /opt/orion-server/discovery.testclickhouse.test + +plugins: + sensorConfigs: + - key: clusterSensor + class: com.pinterest.orion.core.automation.sensor.clickhouse.ClickHouseClusterSensor + interval: 60 + enabled: true \ No newline at end of file