Skip to content

Commit

Permalink
Merge pull request #281 from pinterest/orion-clickhouse-sense-cluster…
Browse files Browse the repository at this point in the history
…s-nodes

Add Orion sensor for ClickHouse clusters/nodes
  • Loading branch information
ambud authored Jun 27, 2024
2 parents 6303478 + 33f422b commit 1d9674e
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,19 @@ public void setNodeType(String nodeType) {
this.nodeType = nodeType;
}

protected String listPropertiesStr() {
return "nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId="
+ clusterId + ", servicePort=" + servicePort + ", localtime="
+ localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings="
+ agentSettings + ", environment=" + environment;
}

/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "NodeInfo [nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId="
+ clusterId + ", servicePort=" + servicePort + ", localtime="
+ localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings="
+ agentSettings + ", environment=" + environment + "]";
return "NodeInfo [" + listPropertiesStr() + "]";
}

}
10 changes: 10 additions & 0 deletions orion-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-http-client</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.pinterest.orion.core.automation.sensor.clickhouse;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import java.util.concurrent.ExecutionException;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;

import com.pinterest.orion.common.NodeInfo;
import com.pinterest.orion.core.Node;
import com.pinterest.orion.core.PluginConfigurationException;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
import com.pinterest.orion.core.clickhouse.ClickHouseNodeInfo;

public class ClickHouseClusterSensor extends ClickHouseSensor {

public static final String CLUSTER_COL = "cluster";
public static final String SHARD_NUM_COL = "shard_num";
public static final String SHARD_WEIGHT_COL = "shard_weight";
public static final String REPLICA_NUM_COL = "replica_num";
public static final String HOST_NAME_COL = "host_name";

public static final String CLUSTERS_QUERY = "SELECT * FROM system.clusters WHERE is_local=1";

private Map<String, Object> config;

@Override
public String getName() {
return "clickhouseclustersensor";
}

@Override
public void initialize(Map<String, Object> config) throws PluginConfigurationException {
super.initialize(config);
}

private void addNodesFromServerSet(ClickHouseCluster cluster, String serversetPath) throws Exception {
List<String> lines = Files.readAllLines(new File(serversetPath).toPath());
for (String serverStr : lines) {
ClickHouseNodeInfo nodeInfo = new ClickHouseNodeInfo();
nodeInfo.setNodeId(serverStr);
nodeInfo.setClusterId(cluster.getClusterId());

String[] splits = serverStr.split(":");
String ip = splits[0];
int port = Integer.parseInt(splits[1]);
nodeInfo.setIp(ip);
nodeInfo.setServicePort(port);

queryShardReplicaInfoFromNode(cluster, nodeInfo, ip, port);

Node node = cluster.getNodeMap().get(serverStr);
if (node == null) {
logger.info("Adding new node with info " + nodeInfo);
} else {
logger.info("Updating node; existing info " + node.getCurrentNodeInfo()
+ ", new info " + nodeInfo);
}
cluster.addNodeWithoutAgent(nodeInfo);
}
}

@Override
public void sense(ClickHouseCluster cluster) throws Exception {
String serversetPath = cluster.getAttribute(cluster.SERVERSET_PATH).getValue();
addNodesFromServerSet(cluster, serversetPath);
}

private void queryShardReplicaInfo(
ClickHouseNode server, ClickHouseNodeInfo nodeInfo) throws ClickHouseException {
try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol());
// each node stores the clusters info in the table system.clusters
// here we query the shard/replica info each node stores for itself
// TODO: build an auditor action to make sure each node stores
// the consistent info about all the other nodes
ClickHouseResponse response = client.read(server)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query(CLUSTERS_QUERY).execute().get()) {
for (ClickHouseRecord r : response.records()) {
// note that within each cluster, there can be smaller
// logical clusters, e.g. created using `Replicated`
String cluster = r.getValue(CLUSTER_COL).asString();
int shard = r.getValue(SHARD_NUM_COL).asInteger();
int shardWeight = r.getValue(SHARD_WEIGHT_COL).asInteger();
int replicaNum = r.getValue(REPLICA_NUM_COL).asInteger();
String hostName = r.getValue(HOST_NAME_COL).asString();

nodeInfo.setHostname(hostName);
nodeInfo.addShardReplicaInfo(cluster, shard, shardWeight, replicaNum);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseException.forCancellation(e, server);
} catch (ExecutionException e) {
throw ClickHouseException.of(e, server);
}
}

private void queryShardReplicaInfoFromNode(
ClickHouseCluster cluster, ClickHouseNodeInfo nodeInfo, String host, int port)
throws ClickHouseException {
String user = cluster.getAttribute(cluster.USER).getValue();
String password = cluster.getAttribute(cluster.PASSWORD).getValue();

ClickHouseNode server = ClickHouseNode.builder()
.host(host)
.port(ClickHouseProtocol.HTTP, port)
.database("system").credentials(
ClickHouseCredentials.fromUserAndPassword(user, password)
).build();
queryShardReplicaInfo(server, nodeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.pinterest.orion.core.automation.sensor.clickhouse;

import com.pinterest.orion.core.Cluster;
import com.pinterest.orion.core.automation.sensor.Sensor;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;

public abstract class ClickHouseSensor extends Sensor {

@Override
public void observe(Cluster cluster) throws Exception {
if (logger == null) {
logger = getLogger(cluster);
}
if(cluster instanceof ClickHouseCluster){
sense((ClickHouseCluster) cluster);
}
}

public abstract void sense(ClickHouseCluster cluster) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ public class ClickHouseCluster extends Cluster {
private static final Logger logger = Logger.getLogger(ClickHouseNode.class.getName());
public static final String CLUSTER_REGION = "region";
public static final String DEFAULT_REGION = "us-east-1";

public static final String USER = "user";
private static final String DEFAULT_USER = "default";

public static final String PASSWORD = "password";
private static final String DEFAULT_PASSWORD = "";

public static final String SERVERSET_PATH = "serversetPath";

private Map<String, Object> config;

public ClickHouseCluster(String id,
Expand All @@ -44,6 +53,9 @@ public ClickHouseCluster(String id,
protected void bootstrapClusterInfo(Map<String, Object> config) throws PluginConfigurationException {
this.config = config;
setAttribute(CLUSTER_REGION, config.getOrDefault(CLUSTER_REGION, DEFAULT_REGION));
setAttribute(USER, config.getOrDefault(USER, DEFAULT_USER));
setAttribute(PASSWORD, config.getOrDefault(PASSWORD, DEFAULT_PASSWORD));
setAttribute(SERVERSET_PATH, config.get(SERVERSET_PATH));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.pinterest.orion.core.clickhouse;

import com.pinterest.orion.common.NodeInfo;

import java.io.Serializable;
import java.util.Map;
import java.util.HashMap;

class ShardReplicaInfo {
public int shardNum;
public int shardWeight;
public int replicaNum;

ShardReplicaInfo(int shardNum, int shardWeight, int replicaNum) {
this.shardNum = shardNum;
this.shardWeight = shardWeight;
this.replicaNum = replicaNum;
}

public String toString() {
return "{shardNum: " + shardNum + ", shardWeight: "
+ shardWeight + " replicaNum: " + replicaNum + "}";
}
}

public class ClickHouseNodeInfo extends NodeInfo implements Serializable {
Map<String, ShardReplicaInfo> infoByCluster = new HashMap<>();

public void addShardReplicaInfo(String cluster, int shardNum, int shardWeight, int replicaNum) {
infoByCluster.put(cluster, new ShardReplicaInfo(shardNum, shardWeight, replicaNum));
}

public int getShardNum(String cluster) {
return infoByCluster.get(cluster).shardNum;
}

public int getShardWeight(String cluster) {
return infoByCluster.get(cluster).shardWeight;
}

public int getReplicaNum(String cluster) {
return infoByCluster.get(cluster).replicaNum;
}

@Override
protected String listPropertiesStr() {
return super.listPropertiesStr() + ", infoByCluster=" + infoByCluster.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.pinterest.orion.core.hbase.HBaseCluster;
import com.pinterest.orion.core.kafka.KafkaCluster;
import com.pinterest.orion.core.memq.MemqCluster;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;

public class ClusterTypeMap {

Expand All @@ -36,6 +37,7 @@ public class ClusterTypeMap {
clusterTypeMap.put("kafka", KafkaCluster.class);
clusterTypeMap.put("memq", MemqCluster.class);
clusterTypeMap.put("hbase", HBaseCluster.class);
clusterTypeMap.put("clickhouse", ClickHouseCluster.class);

try {
Reflections reflections = new Reflections("com.pinterest.orion.core");
Expand Down
37 changes: 37 additions & 0 deletions orion-server/src/test/resources/configs/clickhouse-server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
logging:
level: INFO
appenders:
- type: file
# The file to which current statements will be logged.
currentLogFilename: /var/log/orion/orion.log
# When the log file rotates, the archived log will be renamed to this and gzipped. The
# %d is replaced with the previous day (yyyy-MM-dd). Custom rolling windows can be created
# by passing a SimpleDateFormat-compatible format as an argument: "%d{yyyy-MM-dd-hh}".
archivedLogFilenamePattern: /var/log/orion/orion-%d.log.gz
# The number of archived files to keep.
archivedFileCount: 5

server:
requestLog:
appenders: []
applicationConnectors:
- type: http
port: 8090
bindHost: 127.0.0.1
adminConnectors:
- type: http
port: 8444
bindHost: 0.0.0.0

clusterConfigs:
- clusterId: testclickhouse
type: clickhouse
configuration:
serversetPath: /opt/orion-server/discovery.testclickhouse.test

plugins:
sensorConfigs:
- key: clusterSensor
class: com.pinterest.orion.core.automation.sensor.clickhouse.ClickHouseClusterSensor
interval: 60
enabled: true

0 comments on commit 1d9674e

Please sign in to comment.