Skip to content

Commit

Permalink
Merge pull request #308 from pinterest/memq_zk_client
Browse files Browse the repository at this point in the history
Enhancements to MemQ Cluster Sensor
  • Loading branch information
yisheng-zhou authored Nov 20, 2024
2 parents a57b112 + 8bb3260 commit 5267c45
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import com.pinterest.orion.core.utils.memq.zookeeper.MemqZookeeperClient;

import com.google.gson.Gson;
import com.pinterest.orion.common.NodeInfo;
Expand All @@ -39,9 +37,6 @@ public class MemqClusterSensor extends MemqSensor {

public static final String WRITE_ASSIGNMENTS = "writeAssignments";
public static final String TOPIC_CONFIG = "topicconfig";
public static final String BROKERS = "/brokers";
public static final String TOPICS = "/topics";
public static final String GOVERNOR = "/governor";
public static final String RAW_BROKER_INFO = "rawBrokerInfo";

@Override
Expand All @@ -57,29 +52,17 @@ public void initialize(Map<String, Object> config) throws PluginConfigurationExc
@Override
public void sense(MemqCluster cluster) throws Exception {
try {
if (cluster.getZkClient() == null) {
String zkUrl = cluster.getAttribute(MemqCluster.ZK_CONNECTION_STRING).getValue();
CuratorFramework curator = CuratorFrameworkFactory.newClient(zkUrl,
new ExponentialBackoffRetry(1000, 3));
curator.start();
curator.blockUntilConnected();
cluster.setZkClient(curator);
}
MemqZookeeperClient memqZookeeperClient = new MemqZookeeperClient(cluster);

CuratorFramework zkClient = cluster.getZkClient();
List<String> brokerNames = zkClient.getChildren().forPath(BROKERS);

List<String> brokerNames = memqZookeeperClient.getBrokerNames();
Map<String, List<String>> writeBrokerAssignments = new HashMap<>();

Map<String, Broker> rawBrokerMap = new HashMap<>();

Gson gson = new Gson();

Set<String> brokersInZookeeper = new HashSet<>();
for (String brokerName : brokerNames) {
byte[] brokerData = null;
String brokerDataJsonString = null;
try {
brokerData = zkClient.getData().forPath(BROKERS + "/" + brokerName);
brokerDataJsonString = memqZookeeperClient.getBrokerData(brokerName);
} catch (KeeperException.NoNodeException e) {
cluster.getNodeMap().remove(brokerName);
logger.info(
Expand All @@ -90,7 +73,7 @@ public void sense(MemqCluster cluster) throws Exception {
"Faced an unknown exception when getting broker data for " + brokerName +" from zookeeper:" + e);
continue;
}
Broker broker = gson.fromJson(new String(brokerData), Broker.class);
Broker broker = gson.fromJson(brokerDataJsonString, Broker.class);
NodeInfo info = new NodeInfo();
info.setClusterId(cluster.getClusterId());
String hostname = NetworkUtils.getHostnameFromIpIfAvailable(broker.getBrokerIP());
Expand All @@ -105,19 +88,21 @@ public void sense(MemqCluster cluster) throws Exception {

rawBrokerMap.put(broker.getBrokerIP(), broker);
for (TopicConfig topicConfig : broker.getAssignedTopics()) {
String topic = topicConfig.getTopic();
List<String> hostnames = writeBrokerAssignments.get(topic);
String topicName = topicConfig.getTopic();
List<String> hostnames = writeBrokerAssignments.get(topicName);
if (hostnames == null) {
hostnames = new ArrayList<>();
writeBrokerAssignments.put(topic, hostnames);
writeBrokerAssignments.put(topicName, hostnames);
}
hostnames.add(hostname);
}
brokersInZookeeper.add(broker.getBrokerIP());
}

boolean noBrokerInZookeeper = false;
if (brokersInZookeeper.isEmpty()) {
logger.warning("No broker found in zookeeper for cluster " + cluster.getClusterId());
noBrokerInZookeeper = true;
} else {
// Remove brokers that are not in zookeeper from the cluster node map
for (String nodeId : cluster.getNodeMap().keySet()) {
Expand All @@ -128,16 +113,20 @@ public void sense(MemqCluster cluster) throws Exception {
}

Map<String, TopicConfig> topicConfigMap = new HashMap<>();
List<String> topics = zkClient.getChildren().forPath(TOPICS);
for (String topic : topics) {
byte[] topicData = zkClient.getData().forPath(TOPICS + "/" + topic);
TopicConfig topicConfig = gson.fromJson(new String(topicData), TopicConfig.class);
topicConfigMap.put(topic, topicConfig);
List<String> topics = memqZookeeperClient.getTopics();
for (String topicName : topics) {
String topicDataJsonString = memqZookeeperClient.getTopicData(topicName);
TopicConfig topicConfig = gson.fromJson(topicDataJsonString, TopicConfig.class);
topicConfigMap.put(topicName, topicConfig);
}

byte[] governorData = zkClient.getData().forPath(GOVERNOR);
String governorIp = new String(governorData);
String clusterContext = "Governor: " + governorIp + "\n";
String clusterContext = "NO BROKER";
if (!noBrokerInZookeeper) {
String governorIp = memqZookeeperClient.getGovernorIp();
if (governorIp != null) {
clusterContext = "Governor: " + governorIp + "\n";
}
}

setAttribute(cluster, TOPIC_CONFIG, topicConfigMap);
setAttribute(cluster, RAW_BROKER_INFO, rawBrokerMap);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.pinterest.orion.core.utils.memq.zookeeper;

import com.pinterest.orion.core.memq.MemqCluster;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.List;

public class MemqZookeeperClient {
public static final String BROKERS = "/brokers";
public static final String TOPICS = "/topics";
public static final String GOVERNOR = "/governor";
private boolean refreshZkClientOnException = true;
private String zkUrl;
private MemqCluster cluster;
private CuratorFramework zkClient;

public MemqZookeeperClient(MemqCluster cluster) throws Exception {
this.zkUrl = cluster.getAttribute(MemqCluster.ZK_CONNECTION_STRING).getValue();
this.cluster = cluster;
if (cluster.getZkClient() != null) {
this.zkClient = cluster.getZkClient();
} else {
refreshZkClient();
}
}

public void enableRefreshZkClientOnException() {
this.refreshZkClientOnException = true;
}

public void disableRefreshZkClientOnException() {
this.refreshZkClientOnException = false;
}

/**
* Create a new Zookeeper client using the connection string provided in the cluster configuration.
* @return CuratorFramework
* @throws Exception
*/
private CuratorFramework createZkClient() throws Exception {
CuratorFramework curator = CuratorFrameworkFactory.newClient(
zkUrl,
new ExponentialBackoffRetry(1000, 3)
);
curator.start();
curator.blockUntilConnected();
return curator;
}

/**
* Refresh the Zookeeper client by creating a new one.
* The new client is then set in the cluster object.
* @throws Exception
*/
public void refreshZkClient() throws Exception {
this.zkClient = createZkClient();
cluster.setZkClient(this.zkClient);
}

/**
* Get the children of a node in Zookeeper.
* When an exception occurs, the Zookeeper client is refreshed and the children are fetched again if the refreshZkClientOnException flag is set.
* @param path The path of the node.
* @return List of children node names.
* @throws Exception
*/
private List<String> getChildNodes(String path) throws Exception {
try {
return zkClient.getChildren().forPath(path);
} catch (Exception e) {
if (refreshZkClientOnException) {
refreshZkClient();
return zkClient.getChildren().forPath(path);
} else {
throw e;
}
}
}

/**
* Get the data of a node in Zookeeper.
* When an exception occurs, the Zookeeper client is refreshed and the data is fetched again if the refreshZkClientOnException flag is set.
* @param path The path of the node.
* @return The data of the node as a json string
* @throws Exception
*/
private String getNodeData(String path) throws Exception {
try {
return new String(zkClient.getData().forPath(path));
} catch (Exception e) {
if (refreshZkClientOnException) {
refreshZkClient();
return new String(zkClient.getData().forPath(path));
} else {
throw e;
}
}
}

/**
* Get the names of the brokers in Zookeeper.
* @return List of broker names.
* @throws Exception
*/
public List<String> getBrokerNames() throws Exception {
return getChildNodes(BROKERS);
}

/**
* Get the data of a broker in Zookeeper.
* @param brokerName The name of the broker.
* @return The data of the broker as a json string.
* @throws Exception
*/
public String getBrokerData(String brokerName) throws Exception {
return getNodeData(BROKERS + "/" + brokerName);
}

/**
* Get the names of the topics in Zookeeper.
* @return List of topic names.
* @throws Exception
*/
public List<String> getTopics() throws Exception {
return getChildNodes(TOPICS);
}

/**
* Get the data of a topic in Zookeeper.
* @param topicName The name of the topic.
* @return The data of the topic as a json string.
* @throws Exception
*/
public String getTopicData(String topicName) throws Exception {
return getNodeData(TOPICS + "/" + topicName);
}

/**
* Get IP address the governor in Zookeeper.
* In memq zookeeper, the governor is a node at the path "/governor". Its data is the IP address of the governor.
* @return The IP address of the governor.
* @throws Exception
*/
public String getGovernorIp() throws Exception {
try {
return getNodeData(GOVERNOR);
} catch (Exception e) {
return null;
}
}
}

0 comments on commit 5267c45

Please sign in to comment.