Skip to content

Commit 30bbab5

Browse files
authoredDec 12, 2024
Display Broker and Brokerset Information in Orion UI
This update enables Orion to store and display metrics for brokers and brokersets within the Orion interface. ## Details ### Broker Class - Introduces two new maps: - **`rawBrokerStatus`**: Stores raw data from the metrics API call as a map of `String -> Double`. Example: `cpu_usage -> 0.22`. - **`brokerStatus`**: Contains the keys and values that are displayed in the Orion UI. Both values are strings. Example: `"CPU Usage"` and `"22%"`. ### Brokerset Class - Also includes two new maps: - **`rawBrokersetStatus`**: Stores raw data after aggregating values across all brokers as a map of `String -> Double`. Example: `max_cpu_usage -> 0.22`. - **`brokersetStatus`**: Contains the keys and values to be shown in the Orion UI. Both values are strings. Example: `"Max CPU Usage Across All Brokers"` and `"22%"`. ### BrokerMetricsSensor Class - This class will be overridden by a sensor from the internal package. - The sensor is intended to periodically call metrics APIs, such as the CloudWatch API, and update the `rawBrokerStatus`. ### JavaScript Changes - Multiple JavaScript updates were made to display the information. - The Brokersets page in the Service section will show: - Maximum CPU usage over 7 days. - Maximum disk usage over 7 days for all brokers. ## Testing - The changes have been successfully tested within Pinterest.
2 parents 869442a + 2e28b5e commit 30bbab5

File tree

7 files changed

+155
-11
lines changed

7 files changed

+155
-11
lines changed
 

‎orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java

+38-8
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,26 @@ public class NodeInfo implements Serializable {
3636
private Map<String, String> agentSettings;
3737
private Map<String, String> environment;
3838
private Set<String> brokersets = new HashSet<>();
39+
/**
40+
* Save the broker status in printable format (string)
41+
*/
3942
private Map<String, String> brokerStatus;
43+
/**
44+
* Save the broker status in raw format (number)
45+
*/
46+
private Map<String, Double> rawBrokerStatus;
47+
/**
48+
* @param rawBrokerStatus
49+
*/
50+
public void setRawBrokerStatus(Map<String, Double> rawBrokerStatus) {
51+
this.rawBrokerStatus = rawBrokerStatus;
52+
}
53+
/**
54+
* @return the rawBrokerStatus
55+
*/
56+
public Map<String, Double> getRawBrokerStatus() {
57+
return rawBrokerStatus;
58+
}
4059
/**
4160
* @param brokerStatus
4261
*/
@@ -202,15 +221,26 @@ public void setNodeType(String nodeType) {
202221
this.nodeType = nodeType;
203222
}
204223

205-
/* (non-Javadoc)
206-
* @see java.lang.Object#toString()
207-
*/
208224
@Override
209225
public String toString() {
210-
return "NodeInfo [nodeId=" + nodeId + ", hostname=" + hostname + ", ip=" + ip + ", clusterId="
211-
+ clusterId + ", servicePort=" + servicePort + ", localtime="
212-
+ localtime + ", rack=" + rack + ", serviceInfo=" + serviceInfo + ", agentSettings="
213-
+ agentSettings + ", environment=" + environment + "]";
226+
return String.format(
227+
"NodeInfo [timestamp=%d, nodeId=%s, hostname=%s, ip=%s, clusterId=%s, servicePort=%d, " +
228+
"localtime=%d, rack=%s, nodeType=%s, serviceInfo=%s, agentSettings=%s, environment=%s, brokersets=%s, " +
229+
"brokerStatus=%s, rawBrokerStatus=%s]",
230+
timestamp,
231+
nodeId,
232+
hostname,
233+
ip,
234+
clusterId,
235+
servicePort,
236+
localtime,
237+
rack,
238+
nodeType,
239+
serviceInfo,
240+
agentSettings,
241+
environment,
242+
brokersets,
243+
brokerStatus,
244+
rawBrokerStatus);
214245
}
215-
216246
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.pinterest.orion.core.automation.sensor.kafka;
2+
3+
import com.pinterest.orion.core.kafka.KafkaCluster;
4+
5+
public class BrokerMetricsSensor extends KafkaSensor {
6+
7+
@Override
8+
public String getName() {
9+
return "BrokerMetricsSensor";
10+
}
11+
12+
@Override
13+
public void sense(KafkaCluster cluster) throws Exception {
14+
}
15+
}

‎orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java

+23
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,16 @@ public void sense(KafkaCluster cluster) throws Exception {
7979
brokersetState.addBrokerRange(Arrays.asList(start, end));
8080
}
8181
brokersetState.setBrokerIds(new ArrayList<>(brokerIds));
82+
try {
83+
updateBrokersetStateWithMetrics(cluster, brokersetState, brokerIds);
84+
} catch (Exception e) {
85+
logger.warning(
86+
String.format(
87+
"Failed to update brokerset state with metrics for brokerset %s in cluster %s. Error: %s",
88+
brokersetAlias,
89+
cluster.getName(),
90+
e.getMessage()));
91+
}
8292
brokersetStateMap.put(brokersetAlias, brokersetState);
8393
if (invalidBrokerset) {
8494
handleInvalidBrokerset(brokersetAlias, cluster.getName());
@@ -95,6 +105,19 @@ public void sense(KafkaCluster cluster) throws Exception {
95105
}
96106
}
97107

108+
/**
109+
* Update brokerset state with metrics.
110+
* This method should be overridden by subclasses to update brokerset state with metrics.
111+
* @param cluster Kafka cluster.
112+
* @param brokersetState Brokerset state to update. It has state fields and raw metrics fields to update.
113+
* @param brokerIds Broker ids in the brokerset.
114+
*/
115+
protected void updateBrokersetStateWithMetrics(
116+
KafkaCluster cluster,
117+
BrokersetState brokersetState,
118+
Set<String> brokerIds) {
119+
}
120+
98121
@Override
99122
public String getName() {
100123
return "BrokersetStateSensor";

‎orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java

+31
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,17 @@
33

44
import java.util.ArrayList;
55
import java.util.List;
6+
import java.util.Map;
67

78
public class BrokersetState {
89
/**
910
* The brokersetAlias is the alias of the brokerset.
1011
*/
1112
private String brokersetAlias;
13+
/**
14+
* The instanceType is the type of the brokerset.
15+
*/
16+
private String instanceType;
1217
/**
1318
* The brokersetRanges are the ranges of brokerset that are in the brokerset.
1419
* The brokersetRanges are obtained from the brokerset configuration file.
@@ -19,6 +24,14 @@ public class BrokersetState {
1924
* The brokerIds are obtained from the cluster state.
2025
*/
2126
private List<String> brokerIds = new ArrayList<>();
27+
/**
28+
* Save the broker status in printable format (string).
29+
*/
30+
private Map<String, Double> rawBrokersetStatus;
31+
/**
32+
* Save the broker status in raw format (number).
33+
*/
34+
private Map<String, String> brokersetStatus;
2235
/**
2336
* The constructor of BrokersetState.
2437
* @param brokersetAlias
@@ -108,4 +121,22 @@ public List<String> getBrokerIds() {
108121
public void setBrokerIds(List<String> brokerIds) {
109122
this.brokerIds = brokerIds;
110123
}
124+
public void setRawBrokersetStatus(Map<String, Double> rawBrokersetStatus) {
125+
this.rawBrokersetStatus = rawBrokersetStatus;
126+
}
127+
public Map<String, Double> getRawBrokersetStatus() {
128+
return rawBrokersetStatus;
129+
}
130+
public void setBrokersetStatus(Map<String, String> brokersetStatus) {
131+
this.brokersetStatus = brokersetStatus;
132+
}
133+
public Map<String, String> getBrokersetStatus() {
134+
return brokersetStatus;
135+
}
136+
public void setInstanceType(String instanceType) {
137+
this.instanceType = instanceType;
138+
}
139+
public String getInstanceType() {
140+
return instanceType;
141+
}
111142
}

‎orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js

+19-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@ const routes = [
3030
function getStatsData(clusterId, rawData) {
3131
let brokersetStats = [];
3232
let brokersetData = rawData.brokersetData;
33-
brokersetStats.push({ key: "Broker Count", value: brokersetData.size});
33+
brokersetStats.push({ key: "Broker_Count", value: brokersetData.size});
34+
let brokersetStatus = brokersetData.brokersetStatus;
35+
if (brokersetStatus !== undefined && brokersetStatus !== null) {
36+
for (let key of Object.keys(brokersetStatus)) {
37+
brokersetStats.push({ key: key, value: brokersetStatus[key] });
38+
}
39+
}
3440
return brokersetStats;
3541
}
3642

@@ -111,6 +117,10 @@ function getBrokersetInfoHeader(rawData, clusterId) {
111117
let brokersetData = rawData.brokersetData;
112118
let brokersetAlias = brokersetData.brokersetAlias;
113119
let brokerCount = brokersetData.size;
120+
let instanceType = "Unknown";
121+
if (brokersetData.instanceType !== undefined && brokersetData.instanceType !== null) {
122+
instanceType = brokersetData.instanceType;
123+
}
114124
return (
115125
<Box my={2}>
116126
<Grid container display="flex" alignItems="center" spacing={2}>
@@ -130,6 +140,14 @@ function getBrokersetInfoHeader(rawData, clusterId) {
130140
label={brokerCount + " brokers"}
131141
/>
132142
</Grid>
143+
<Grid item>
144+
<Chip
145+
variant="outlined"
146+
color="primary"
147+
size="small"
148+
label={instanceType}
149+
/>
150+
</Grid>
133151
</Grid>
134152
</Box>
135153
);

‎orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js

+22-2
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,37 @@ export default function Brokersets(props) {
4242
}
4343
let columns = [
4444
{ title: "Name", field: "brokersetAlias" },
45-
{ title: "Broker Count", field: "brokerCount" }
45+
{ title: "Broker Count", field: "brokerCount" },
46+
{ title: "Max CPU Usage 7D", field: "maxCpuUsage7Day" },
47+
{ title: "Max Disk Usage 7D", field: "maxDiskUsage7Day" },
48+
{ title: "Updated Time", field: "timestamp" }
4649
]
4750
let clusterId = props.cluster.clusterId;
4851
let brokersetToRowValuesMap = {};
4952
for (let brokerset of brokersets) {
5053
let brokersetAlias = brokerset.brokersetAlias;
54+
let maxCpuUsage7Day = "N/A";
55+
let maxDiskUsage7Day = "N/A";
56+
let timestamp = "N/A"
57+
if (brokerset.brokersetStatus) {
58+
if (brokerset.brokersetStatus["CPU_Usage_Max_All_Brokers_7Days"] !== undefined) {
59+
maxCpuUsage7Day = brokerset.brokersetStatus["CPU_Usage_Max_All_Brokers_7Days"];
60+
}
61+
if (brokerset.brokersetStatus["Disk_Usage_Max_All_Brokers_7Days"] !== undefined) {
62+
maxDiskUsage7Day = brokerset.brokersetStatus["Disk_Usage_Max_All_Brokers_7Days"];
63+
}
64+
if (brokerset.brokersetStatus["Short_Timestamp"] !== undefined) {
65+
timestamp = brokerset.brokersetStatus["Short_Timestamp"];
66+
}
67+
}
5168
brokersetToRowValuesMap[brokersetAlias] = {
5269
"brokersetAlias": brokersetAlias,
5370
"clusterId": clusterId,
5471
"brokerCount": brokerset.size,
55-
"brokersetData": brokerset
72+
"brokersetData": brokerset,
73+
"maxCpuUsage7Day": maxCpuUsage7Day,
74+
"maxDiskUsage7Day": maxDiskUsage7Day,
75+
"timestamp": timestamp
5676
}
5777
}
5878

‎orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js

+7
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,13 @@ function getBrokersetColumns() {
268268

269269
function getBrokerStatsData(cluster, node) {
270270
let brokerStatsData = [];
271+
let brokerStats = node.currentNodeInfo.brokerStatus;
272+
if (brokerStats === undefined || brokerStats === null) {
273+
return brokerStatsData;
274+
}
275+
for (let [key, value] of Object.entries(brokerStats)) {
276+
brokerStatsData.push({key: key, value: JSON.stringify(value)});
277+
}
271278
return brokerStatsData;
272279
}
273280

0 commit comments

Comments
 (0)
Please sign in to comment.