From f0fcab447d834e0705c4c265dedbd67a38598807 Mon Sep 17 00:00:00 2001 From: catdrink <2738035238@qq.com> Date: Sun, 8 Dec 2024 00:39:05 +0800 Subject: [PATCH] proxy cluster and agents --- .../manager/server/proxy/BigtopProxy.java | 6 +- .../manager/server/proxy/PrometheusProxy.java | 576 ++++++++++-------- .../manager/server/utils/ProxyUtils.java | 102 ++++ 3 files changed, 428 insertions(+), 256 deletions(-) create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/utils/ProxyUtils.java diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/proxy/BigtopProxy.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/proxy/BigtopProxy.java index 8e053781..5d26abfc 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/proxy/BigtopProxy.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/proxy/BigtopProxy.java @@ -18,6 +18,7 @@ */ package org.apache.bigtop.manager.server.proxy; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; @@ -73,6 +74,9 @@ public JsonNode queryClusterAgentsList(String clusterId) { JsonNode agents = result.get("data").get("content"); agents.forEach(agent -> hosts.add(agent.get("ipv4").asText())); } - return objectMapper.createObjectNode().set("agents", objectMapper.valueToTree(hosts)); + ObjectNode clusterAgents = objectMapper.createObjectNode(); + clusterAgents.put("agentsNum",hosts.size()); + clusterAgents.set("agents", objectMapper.valueToTree(hosts)); + return clusterAgents; } } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/proxy/PrometheusProxy.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/proxy/PrometheusProxy.java index b914b5a7..cf724b06 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/proxy/PrometheusProxy.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/proxy/PrometheusProxy.java @@ -18,6 +18,7 @@ */ package org.apache.bigtop.manager.server.proxy; +import org.apache.bigtop.manager.server.utils.ProxyUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; @@ -31,13 +32,10 @@ import reactor.core.publisher.Mono; import jakarta.annotation.Resource; -import java.time.Duration; + import java.time.Instant; -import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.LocalTime; import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashSet; import java.util.Objects; @@ -54,98 +52,99 @@ public class PrometheusProxy { @Value("${monitoring.agent-host-job-name}") private String agentHostJobName; + public static final String MEM_IDLE = "memIdle"; + public static final String MEM_TOTAL = "memTotal"; + public static final String DISK_IDLE = "diskFreeSpace"; + public static final String DISK_TOTAL = "diskTotalSpace"; + public static final String FILE_OPEN_DESCRIPTOR = "fileOpenDescriptor"; + public static final String FILE_TOTAL_DESCRIPTOR = "fileTotalDescriptor"; + public static final String CPU_LOAD_AVG_MIN_1 = "cpuLoadAvgMin_1"; + public static final String CPU_LOAD_AVG_MIN_5 = "cpuLoadAvgMin_5"; + public static final String CPU_LOAD_AVG_MIN_15 = "cpuLoadAvgMin_15"; + public static final String CPU_USAGE = "cpuUsage"; + public static final String PHYSICAL_CORES = "physical_cores"; + public static final String DISK_READ = "diskRead"; + public static final String DISK_WRITE = "diskWrite"; + public PrometheusProxy( WebClient.Builder webClientBuilder, @Value("${monitoring.prometheus-host}") String prometheusHost) { this.webClient = webClientBuilder.baseUrl(prometheusHost).build(); } - /** - * query agents healthy + * Retrieve current data in real-time */ - public JsonNode queryAgentsHealthyStatus() { + public JsonNode query(String params) { Mono body = webClient .post() .uri(uriBuilder -> uriBuilder.path("/api/v1/query").build()) .contentType(MediaType.APPLICATION_FORM_URLENCODED) - .body(BodyInserters.fromFormData("query", "up{job=\"%s\"}".formatted(agentHostJobName)) + .body(BodyInserters.fromFormData("query", params) .with("timeout", "10")) .retrieve() .bodyToMono(JsonNode.class); JsonNode result = body.block(); - - ObjectMapper objectMapper = new ObjectMapper(); if (result == null || result.isEmpty() || !"success".equals(result.get("status").asText("failure"))) { - return objectMapper.createObjectNode(); - } - JsonNode agents = result.get("data").get("result"); - ArrayNode agentsHealthyStatus = objectMapper.createArrayNode(); - for (JsonNode agent : agents) { - JsonNode agentStatus = agent.get("metric"); - ObjectNode temp = objectMapper.createObjectNode(); - temp.put("agentInfo", agentStatus.get("instance").asText()); - temp.put("prometheusAgentJob", agentStatus.get("job").asText()); - JsonNode status = agent.get("value"); - LocalDateTime instant = Instant.ofEpochSecond(status.get(0).asLong()) - .atZone(ZoneId.systemDefault()) - .toLocalDateTime(); - temp.put("time", instant.toString()); - temp.put("agentHealthyStatus", status.get(1).asInt() == 1 ? "running" : "down"); - agentsHealthyStatus.add(temp); + return null; } - return agentsHealthyStatus; + return result; } - /** - * query instant data + * Retrieve data with a specified interval before the current time */ - public JsonNode query(String params) { + public JsonNode queryRange(String query, long start, long end, String step) { Mono body = webClient .post() - .uri("/api/v1/query") + .uri(uriBuilder -> uriBuilder.path("/api/v1/query_range").build()) .contentType(MediaType.APPLICATION_FORM_URLENCODED) - .body(BodyInserters.fromFormData("query", params).with("timeout", "10")) + .body(BodyInserters.fromFormData("query", query) + .with("timeout", "10") + .with("start",String.valueOf(start)) + .with("end",String.valueOf(end)) + .with("step",step)) .retrieve() .bodyToMono(JsonNode.class); JsonNode result = body.block(); if (result == null || result.isEmpty() - || !"success".equals(result.get("status").asText("failure"))) { + || !"success".equals(result.path("status").asText("failure"))) { return null; } return result; } /** - * query a range of data + * query agents healthy */ - public JsonNode queryRange(String query, long start, long end, String step) { - Mono body = webClient - .get() - .uri(uriBuilder -> uriBuilder - .path("/api/v1/query_range") - .queryParam("query", query) - .queryParam("start", start) - .queryParam("end", end) - .queryParam("step", step) - .build()) - .retrieve() - .bodyToMono(JsonNode.class); - - JsonNode result = body.block(); - if (result == null - || result.isEmpty() - || !"success".equals(result.path("status").asText("failure"))) { - return null; + public JsonNode queryAgentsHealthyStatus() { + JsonNode result = query("up{job=\"%s\"}".formatted(agentHostJobName)); + ObjectMapper objectMapper = new ObjectMapper(); + ArrayNode agentsHealthyStatus = objectMapper.createArrayNode(); + if (result != null) { + JsonNode agents = result.get("data").get("result"); + for (JsonNode agent : agents) { + JsonNode agentStatus = agent.get("metric"); + ObjectNode temp = objectMapper.createObjectNode(); + temp.put("agentInfo", agentStatus.get("instance").asText()); + temp.put("prometheusAgentJob", agentStatus.get("job").asText()); + JsonNode status = agent.get("value"); + LocalDateTime instant = Instant.ofEpochSecond(status.get(0).asLong()) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + temp.put("time", instant.toString()); + temp.put("agentHealthyStatus", status.get(1).asInt() == 1 ? "running" : "down"); + agentsHealthyStatus.add(temp); + } + return agentsHealthyStatus; } - return result; + return objectMapper.createObjectNode(); } /** - * query agents list + * query agents ipv4 list */ - private JsonNode queryAgentsList() { + private JsonNode queryAgents() { JsonNode result = query("agent_host_monitoring_cpu"); ObjectMapper objectMapper = new ObjectMapper(); if (result != null) { @@ -159,162 +158,194 @@ private JsonNode queryAgentsList() { for (String value : iPv4addrSet.toArray(new String[0])) { iPv4addrArray.add(value); } - return objectMapper.createObjectNode().set("iPv4addr", iPv4addrArray); // iPv4 + ObjectNode node = objectMapper.createObjectNode(); + node.put("agentsNum",iPv4addrArray.size()); + node.set("iPv4addr", iPv4addrArray); + return node; // iPv4 } } return objectMapper.createObjectNode(); } /** - * query agents info + * query agents static info */ public JsonNode queryAgentsInfo() { ObjectMapper objectMapper = new ObjectMapper(); ArrayNode agentsInfo = objectMapper.createArrayNode(); - JsonNode agents = queryAgentsList().get("iPv4addr"); // get all host + JsonNode agents = queryAgents().get("iPv4addr"); // get all host for (JsonNode agent : agents) { - JsonNode cpuResult = queryAgentCpu(agent.asText()); - JsonNode memResult = queryAgentMemory(agent.asText()); + JsonNode cpuResult = retrieveAgentCpu(agent.asText()); + JsonNode memResult = retrieveAgentMemory(agent.asText()); JsonNode diskResult = queryAgentDisk(agent.asText()); - JsonNode diskIOResult = queryAgentDiskIO(agent.asText()); ObjectNode temp = objectMapper.createObjectNode(); - - // hostInfo + // HOST temp.put("hostname", cpuResult.get("hostname").asText()); temp.put("iPv4addr", cpuResult.get("iPv4addr").asText()); temp.put("cpuInfo", cpuResult.get("cpuInfo").asText().strip()); - temp.put("time", cpuResult.get("time").asText()); - temp.put("cpuLoadAvgMin_1", cpuResult.get("cpuLoadAvgMin_1").asDouble()); - temp.put("cpuLoadAvgMin_5", cpuResult.get("cpuLoadAvgMin_5").asDouble()); - temp.put("cpuLoadAvgMin_15", cpuResult.get("cpuLoadAvgMin_15").asDouble()); - temp.put("cpuUsage", cpuResult.get("cpuUsage").asDouble()); - temp.put("fileTotalDescriptor", cpuResult.get("fileTotalDescriptor").asLong()); - temp.put("fileOpenDescriptor", cpuResult.get("fileOpenDescriptor").asLong()); + // temp.put("iPv6addr", cpuResult.get("iPv6addr").asText()); + temp.put("os", cpuResult.get("os").asText()); + temp.put("architecture", cpuResult.get("architecture").asText()); + temp.put(PHYSICAL_CORES, cpuResult.get(PHYSICAL_CORES).asText()); // MEM - temp.put("memIdle", memResult.get("memIdle").asLong()); - temp.put("memTotal", memResult.get("memTotal").asLong()); + temp.put(MEM_TOTAL, memResult.get(MEM_TOTAL).asLong()); // DISK - temp.set("diskSpace", diskResult.get("diskInfo")); - // DISK IO - temp.set("diskIO", diskIOResult.get("diskIO")); + temp.set(DISK_TOTAL, diskResult.get(DISK_TOTAL)); agentsInfo.add(temp); } return agentsInfo; } + /** + * query agents dynamic info + */ + public JsonNode queryAgentsInfo(String pace){ + JsonNode agents = queryAgents(); + JsonNode agentsIpv4 = agents.get("iPv4addr"); + ObjectMapper mapper = new ObjectMapper(); + ArrayNode agentsInfo = mapper.createArrayNode(); + for (JsonNode agentIpv4 : agentsIpv4) { + ObjectNode ag = mapper.createObjectNode(); + double[] agentsCpuUsage = new double[6]; + double[] agentsCpuLoad1 = new double[6]; + double[] agentsCpuLoad2 = new double[6]; + double[] agentsCpuLoad3 = new double[6]; + long[] agentMemIdle = new long[6]; + long[] agentMemTotal = new long[6]; + long[] agentDiskRead = new long[6]; + long[] agentDiskWrite = new long[6]; + // real-time cpuUsage + JsonNode agentCpu = retrieveAgentCpu(agentIpv4.asText()); + // real-time mem + JsonNode agentMem = retrieveAgentMemory(agentIpv4.asText()); + // real-time disk + JsonNode agentDisk = queryAgentDisk(agentIpv4.asText()); + // real-time diskIO + JsonNode agentDiskIO = queryAgentDiskIO(agentIpv4.asText()); + + // dynamic + JsonNode agentCpuInterval = retrieveAgentCpu(agentIpv4.asText(), pace); + JsonNode agentMemInterval = retrieveAgentMemory(agentIpv4.asText(),pace); + JsonNode agentDiskIOInterval = queryAgentDiskIO(agentIpv4.asText(),pace); + for (int i = 0; i < 6; i++) { + // CPU + System.out.println(agentCpuInterval); + agentsCpuUsage[i] = ProxyUtils.getDoubleSafely(agentCpuInterval, CPU_USAGE, i); + agentsCpuLoad1[i] = ProxyUtils.getDoubleSafely(agentCpuInterval, CPU_LOAD_AVG_MIN_1, i); + agentsCpuLoad2[i] = ProxyUtils.getDoubleSafely(agentCpuInterval, CPU_LOAD_AVG_MIN_5, i); + agentsCpuLoad3[i] = ProxyUtils.getDoubleSafely(agentCpuInterval, CPU_LOAD_AVG_MIN_15, i); + + // MEM + agentMemIdle[i] = ProxyUtils.getLongSafely(agentMemInterval, MEM_IDLE, i); + agentMemTotal[i] = ProxyUtils.getLongSafely(agentMemInterval, MEM_TOTAL, i); + + // DISK IO + agentDiskRead[i] = ProxyUtils.getLongSafely(agentDiskIOInterval, DISK_READ, i); + agentDiskWrite[i] = ProxyUtils.getLongSafely(agentDiskIOInterval, DISK_WRITE, i); + } + // cur + ag.put("cpu_usage_cur", agentCpu.get(CPU_USAGE).asDouble()); + ag.put("memory_usage_cur", (double)(agentMem.get(MEM_TOTAL).asLong() - agentMem.get(MEM_IDLE).asLong()) / agentMem.get(MEM_TOTAL).asLong()); + ag.put("disk_usage_cur",(double) (agentDisk.get(DISK_TOTAL).asLong() - agentDisk.get(DISK_IDLE).asLong()) / agentDisk.get(DISK_TOTAL).asLong()); + ag.put("file_descriptor_usage",(double)agentCpu.get(FILE_OPEN_DESCRIPTOR).asLong() / agentCpu.get(FILE_TOTAL_DESCRIPTOR).asLong()); + ag.put("disk_read",agentDiskIO.get(DISK_READ).asLong()); + ag.put("disk_read",agentDiskIO.get(DISK_WRITE).asLong()); + // cpu + ag.set("cpu_usage",ProxyUtils.array2node(agentsCpuUsage)); + ag.set("system_load1",ProxyUtils.array2node(agentsCpuLoad1)); + ag.set("system_load2",ProxyUtils.array2node(agentsCpuLoad2)); + ag.set("system_load3",ProxyUtils.array2node(agentsCpuLoad3)); + // mem + ag.set("memory_usage",ProxyUtils.array2node(agentMemIdle,agentMemTotal)); + // disk io + ag.set("disk_read",ProxyUtils.array2node(agentDiskRead)); + ag.set("disk_write",ProxyUtils.array2node(agentDiskWrite)); + + agentsInfo.add(ag); + } + return agentsInfo; + } /** * query cluster info */ - public JsonNode queryClusterInfo(String clusterId, String step) { - JsonNode agentsIpv4 = bigtopProxy.queryClusterAgentsList(clusterId).get("hosts"); // cluster's agents + public JsonNode queryClusterInfo(String clusterId, String pace) { + JsonNode agents = bigtopProxy.queryClusterAgentsList(clusterId).get("agents"); // cluster's agents + int agentsNum = bigtopProxy.queryClusterAgentsList(clusterId).get("agentsNum").asInt(); // agentsNum ObjectMapper mapper = new ObjectMapper(); - int totalCpuCores = 0; + int total_physical_cores = 0; long totalMemSpace = 0L; long totalDiskSpace = 0L; long totalMemIdle = 0L; double instantCpuUsage = 0.0; - + double[][] agentsCpuUsage = new double[agentsNum][6]; + double[][] agentsCpuLoad1 = new double[agentsNum][6]; + double[][] agentsCpuLoad2 = new double[agentsNum][6]; + double[][] agentsCpuLoad3 = new double[agentsNum][6]; + long[][] agentMemIdle = new long[agentsNum][6]; + long[][] agentMemTotal = new long[agentsNum][6]; + int agentIndex = 0; ObjectNode clusterInfo = mapper.createObjectNode(); - for (JsonNode agentIpv4 : agentsIpv4) { - JsonNode agentCpuStep = queryAgentCpu(agentIpv4.asText(), step); - // JsonNode agentMemStep = queryAgentMemory(agentsIpv4.asText(),step); - JsonNode agentMem = queryAgentMemory(agentsIpv4.asText()); - JsonNode agentDisk = queryAgentDisk(agentIpv4.asText()); - JsonNode agentCpu = queryAgentCpu(agentIpv4.asText()); - + for (JsonNode agentIpv4 : agents) { + // real-time cpuUsage + JsonNode agentCpu = retrieveAgentCpu(agentIpv4.asText()); instantCpuUsage += agentCpu.get("cpuUsage").asDouble() - * agentCpuStep.get("physical_cores").asInt(); + * agentCpu.get(PHYSICAL_CORES).asInt(); + // real-time mem + JsonNode agentMem = retrieveAgentMemory(agents.asText()); totalMemIdle += agentMem.get("memIdle").asLong(); - totalCpuCores += agentCpuStep.get("physical_cores").asInt(); totalMemSpace += agentMem.get(("memTotal")).asLong(); - for (JsonNode diskInfo : agentDisk.get("diskInfo")) { - if (Objects.equals(diskInfo.get("diskTotalSpace").asText(), "diskTotalSpace")) { - totalDiskSpace += diskInfo.get("diskValue").asLong(); - } + // real-time disk + JsonNode agentDisk = queryAgentDisk(agentIpv4.asText()); + totalDiskSpace += agentDisk.get(DISK_TOTAL).asLong(); + + // avg time + JsonNode agentCpuStep = retrieveAgentCpu(agentIpv4.asText(), pace); + JsonNode agentMemStep = retrieveAgentMemory(agents.asText(),pace); + int agent_physical_cores = agentCpuStep.get(PHYSICAL_CORES).asInt(); + total_physical_cores += agent_physical_cores; + for (int i = 0; i < 6; i++) { + // CPU + agentsCpuUsage[agentIndex][i] = ProxyUtils.getDoubleSafely(agentCpuStep, CPU_USAGE, i) * agent_physical_cores; + agentsCpuLoad1[agentIndex][i] = ProxyUtils.getDoubleSafely(agentCpuStep, CPU_LOAD_AVG_MIN_1, i) * agent_physical_cores; + agentsCpuLoad2[agentIndex][i] = ProxyUtils.getDoubleSafely(agentCpuStep, CPU_LOAD_AVG_MIN_5, i) * agent_physical_cores; + agentsCpuLoad3[agentIndex][i] = ProxyUtils.getDoubleSafely(agentCpuStep, CPU_LOAD_AVG_MIN_15, i) * agent_physical_cores; + + // MEM + agentMemIdle[agentIndex][i] = ProxyUtils.getLongSafely(agentMemStep, MEM_IDLE, i); + agentMemTotal[agentIndex][i] = ProxyUtils.getLongSafely(agentMemStep, MEM_TOTAL, i); } - - // JsonNode agentCpuUsage = agentCpuStep.get("cpuUsage"); - // JsonNode memIdle = agentMemStep.get("memIdle"); - // JsonNode memTotal = agentMemStep.get("memTotal"); - // for(int i = 0; i < 7;i++){ - // if (i < memUsage.size()) { - // - // long currentMemIdleValue = memIdle.get(i).asLong(); - // long currentMemTotalValue = memTotal.get(i).asLong(); - // long newValue = - // - // - // ((ArrayNode) memUsage).set(i, ); - // } else if (indexToUpdate < arrayNode.size()) { - // ((ArrayNode) arrayNode).set(indexToUpdate, factory.numberNode(valueToAdd)); - // } - // mem.add(i,mem.get(i) + memIdle.get(0).asLong() ); } - - clusterInfo.put("total_physical_cores", totalCpuCores); + // static + clusterInfo.put("total_physical_cores", total_physical_cores); clusterInfo.put("total_memory", totalMemSpace); clusterInfo.put("total_disk", totalDiskSpace); - clusterInfo.put("cpu_usage_cur", instantCpuUsage / totalCpuCores); - clusterInfo.put("memory_usage_cur", totalMemIdle / totalMemSpace); - // clusterInfo.put("cpu_usage",); - // clusterInfo.put("memory_usage",); - return clusterInfo; - } - /** - * query cpu filter by step - */ - public JsonNode queryAgentCpu(String iPv4addr, String step) { - String params = String.format("agent_host_monitoring_cpu{iPv4addr=\"%s\"})", iPv4addr); - ArrayList timeStampsList = getTimeStampsList(Integer.parseInt(step)); // sum 8 and between 7 - JsonNode result = queryRange( - params, timeStampsList.get(timeStampsList.size() - 1), timeStampsList.get(0), step); // end start - ObjectMapper objectMapper = new ObjectMapper(); - if (result != null) { - JsonNode agentCpu = result.get("data").get("result"); // differ type cpu - if (agentCpu.isArray() && !agentCpu.isEmpty()) { - ObjectNode agentCpuInfo = objectMapper.createObjectNode(); - // metric - JsonNode agentCpuMetrics = agentCpuInfo.get(0).get("metric"); - agentCpuInfo.put("hostname", agentCpuMetrics.get("hostname").asText()); - agentCpuInfo.put("cpuInfo", agentCpuMetrics.get("cpu_info").asText()); - agentCpuInfo.put("iPv4addr", agentCpuMetrics.get("iPv4addr").asText()); - agentCpuInfo.put("os", agentCpuMetrics.get("os").asText()); - agentCpuInfo.put("architecture", agentCpuMetrics.get("arch").asText()); - agentCpuInfo.put( - "physical_cores", agentCpuMetrics.get("physical_cores").asText()); - agentCpuInfo.put( - "fileOpenDescriptor", - agentCpuMetrics.get("fileOpenDescriptor").asLong()); - agentCpuInfo.put( - "fileTotalDescriptor", - agentCpuMetrics.get("fileTotalDescriptor").asLong()); + // cur + clusterInfo.put("cpu_usage_cur", instantCpuUsage / total_physical_cores); + clusterInfo.put("memory_usage_cur", (double) (totalMemSpace - totalMemIdle)/ totalMemSpace); - // value - for (JsonNode cpuType : agentCpu) { - JsonNode agentCpuValues = cpuType.get("values"); - ArrayNode cpuValues = objectMapper.createArrayNode(); - for (JsonNode stepValue : agentCpuValues) { // by step - cpuValues.add(stepValue.get(1).asDouble()); - } - agentCpuInfo.set(cpuType.get("metric").get("cpuUsage").asText(), cpuValues); - return agentCpuInfo; - } - } - } - return objectMapper.createObjectNode(); + // cpu + clusterInfo.set("cpu_usage",ProxyUtils.array2node(agentsCpuUsage,total_physical_cores,agentsNum)); + clusterInfo.set("system_load1",ProxyUtils.array2node(agentsCpuLoad1,total_physical_cores,agentsNum)); + clusterInfo.set("system_load2",ProxyUtils.array2node(agentsCpuLoad2,total_physical_cores,agentsNum)); + clusterInfo.set("system_load3",ProxyUtils.array2node(agentsCpuLoad3,total_physical_cores,agentsNum)); + + // mem + clusterInfo.set("memory_usage",ProxyUtils.array2node(agentMemIdle,agentMemTotal,agentsNum)); + return clusterInfo; } + /** - * query agent cpu by ipv4 + * retrieve cpu */ - public JsonNode queryAgentCpu(String iPv4addr) { + public JsonNode retrieveAgentCpu(String iPv4addr) { String params = String.format("agent_host_monitoring_cpu{iPv4addr=\"%s\"}", iPv4addr); JsonNode result = query(params); ObjectMapper objectMapper = new ObjectMapper(); if (result != null) { JsonNode agentCpus = result.get("data").get("result"); - if (agentCpus.isArray() && !agentCpus.isEmpty()) { // differ cpu on an agent + if (agentCpus.isArray() && !agentCpus.isEmpty()) { // metric JsonNode agentCpuMetric = agentCpus.get(0).get("metric"); ObjectNode agentInfo = objectMapper.createObjectNode(); @@ -324,128 +355,156 @@ public JsonNode queryAgentCpu(String iPv4addr) { agentInfo.put("os", agentCpuMetric.get("os").asText()); agentInfo.put("architecture", agentCpuMetric.get("arch").asText()); agentInfo.put( - "physical_cores", agentCpuMetric.get("physical_cores").asText()); + PHYSICAL_CORES, agentCpuMetric.get(PHYSICAL_CORES).asText()); agentInfo.put( - "fileOpenDescriptor", - agentCpuMetric.get("fileOpenDescriptor").asLong()); + FILE_OPEN_DESCRIPTOR, + agentCpuMetric.get(FILE_OPEN_DESCRIPTOR).asLong()); agentInfo.put( - "fileTotalDescriptor", - agentCpuMetric.get("fileTotalDescriptor").asLong()); + FILE_TOTAL_DESCRIPTOR, + agentCpuMetric.get(FILE_TOTAL_DESCRIPTOR).asLong()); // value - JsonNode agentCpuValue = agentCpus.get(0).get("value"); - LocalDateTime instant = Instant.ofEpochSecond( - agentCpuValue.get(0).asLong()) - .atZone(ZoneId.systemDefault()) - .toLocalDateTime(); - agentInfo.put("time", instant.toString()); for (JsonNode agent : agentCpus) { agentInfo.put( agent.get("metric").get("cpuUsage").asText(), - agent.get("value").get(1).asDouble()); // cpu metric + agent.get("value").get(1).asDouble()); } return agentInfo; } } return objectMapper.createObjectNode(); } - /** - * query agent memory by ipv4 + * retrieve cpu internal */ - public JsonNode queryAgentMemory(String iPv4addr) { + public JsonNode retrieveAgentCpu(String iPv4addr, String pace) { + String params = String.format("agent_host_monitoring_cpu{iPv4addr=\"%s\"}",iPv4addr); + ArrayList timeStampsList = ProxyUtils.getTimeStampsList(Integer.parseInt(pace)); + JsonNode result = queryRange( + params, timeStampsList.get(timeStampsList.size() - 1), timeStampsList.get(0), ProxyUtils.Number2Param(Integer.parseInt(pace))); // end start ObjectMapper objectMapper = new ObjectMapper(); + if (result != null) { + JsonNode agentCpu = result.get("data").get("result"); + if (agentCpu.isArray() && !agentCpu.isEmpty()) { + ObjectNode agentCpuInfo = objectMapper.createObjectNode(); + // metric + JsonNode agentCpuMetrics = agentCpu.get(0).get("metric"); + agentCpuInfo.put("hostname", agentCpuMetrics.get("hostname").asText()); + agentCpuInfo.put("cpuInfo", agentCpuMetrics.get("cpu_info").asText()); + agentCpuInfo.put("iPv4addr", agentCpuMetrics.get("iPv4addr").asText()); + agentCpuInfo.put("os", agentCpuMetrics.get("os").asText()); + agentCpuInfo.put("architecture", agentCpuMetrics.get("arch").asText()); + agentCpuInfo.put( + PHYSICAL_CORES, + agentCpuMetrics.get(PHYSICAL_CORES).asInt()); + agentCpuInfo.put( + FILE_OPEN_DESCRIPTOR, + agentCpuMetrics.get(FILE_OPEN_DESCRIPTOR).asLong()); + agentCpuInfo.put( + FILE_TOTAL_DESCRIPTOR, + agentCpuMetrics.get(FILE_TOTAL_DESCRIPTOR).asLong()); + + // value + for (JsonNode cpuType : agentCpu) { + JsonNode agentCpuValues = cpuType.get("values"); + ArrayNode cpuValues = objectMapper.createArrayNode(); + for (JsonNode stepValue : agentCpuValues) { + cpuValues.add(stepValue.get(1).asDouble()); + } + agentCpuInfo.set(cpuType.get("metric").get("cpuUsage").asText(), cpuValues); + } + return agentCpuInfo; + } + } + return objectMapper.createObjectNode(); + } + + + /** + * retrieve memory + */ + public JsonNode retrieveAgentMemory(String iPv4addr) { String query = String.format("agent_host_monitoring_mem{iPv4addr=\"%s\"}", iPv4addr); JsonNode result = query(query); + ObjectMapper objectMapper = new ObjectMapper(); if (result != null) { JsonNode agentsMem = result.get("data").get("result"); if (agentsMem.isArray() && !agentsMem.isEmpty()) { - JsonNode agentMemValue = agentsMem.get(0).get("value"); + ObjectNode agentsMemInfo = objectMapper.createObjectNode(); + // metric JsonNode agentMemMetric = agentsMem.get(0).get("metric"); - ObjectNode agentsInfo = objectMapper.createObjectNode(); - agentsInfo.put("hostname", agentMemMetric.get("hostname").asText()); - agentsInfo.put("iPv4addr", agentMemMetric.get("iPv4addr").asText()); - LocalDateTime instant = Instant.ofEpochSecond( - agentMemValue.get(0).asLong()) - .atZone(ZoneId.systemDefault()) - .toLocalDateTime(); - agentsInfo.put("time", instant.toString()); + agentsMemInfo.put("hostname", agentMemMetric.get("hostname").asText()); + agentsMemInfo.put("iPv4addr", agentMemMetric.get("iPv4addr").asText()); for (JsonNode agent : agentsMem) { - agentsInfo.put( + agentsMemInfo.put( agent.get("metric").get("memUsage").asText(), agent.get("value").get(1).asLong()); // mem metric } - return agentsInfo; + return agentsMemInfo; } } return objectMapper.createObjectNode(); } - /** - * query mem filter by step + * retrieve memory internal */ - public JsonNode queryAgentMemory(String iPv4addr, String step) { + public JsonNode retrieveAgentMemory(String iPv4addr, String pace) { String params = String.format("agent_host_monitoring_mem{iPv4addr=\"%s\"}", iPv4addr); - ArrayList timeStampsList = getTimeStampsList(Integer.parseInt(step)); // sum 8 and between 7 + ArrayList timeStampsList = ProxyUtils.getTimeStampsList(Integer.parseInt(pace)); // sum 8 and between 7 JsonNode result = queryRange( - params, timeStampsList.get(timeStampsList.size() - 1), timeStampsList.get(0), step); // end start + params, timeStampsList.get(timeStampsList.size() - 1), timeStampsList.get(0), ProxyUtils.Number2Param(Integer.parseInt(pace))); // end start ObjectMapper objectMapper = new ObjectMapper(); if (result != null) { JsonNode agentMem = result.get("data").get("result"); - if (agentMem.isArray() && !agentMem.isEmpty()) { // differ memory + if (agentMem.isArray() && !agentMem.isEmpty()) { ObjectNode agentMemInfo = objectMapper.createObjectNode(); // metric JsonNode agentMemMetrics = agentMem.get(0).get("metric"); agentMemInfo.put("hostname", agentMemMetrics.get("hostname").asText()); agentMemInfo.put("iPv4addr", agentMemMetrics.get("iPv4addr").asText()); - agentMemInfo.put("memTotal", agentMemMetrics.get("memTotal").asText()); // value for (JsonNode stepAgent : agentMem) { - JsonNode agentMemValues = stepAgent.get("value"); + JsonNode agentMemValues = stepAgent.get("values"); ArrayNode memValues = objectMapper.createArrayNode(); for (JsonNode value : agentMemValues) { memValues.add(value.get(1).asDouble()); } - agentMemInfo.set(agentMemMetrics.get("memUsage").asText(), memValues); - return agentMemInfo; + agentMemInfo.set(stepAgent.get("metric").get("memUsage").asText(), memValues); } + return agentMemInfo; } } return objectMapper.createObjectNode(); } /** - * query agent disk by ipv4 + * retrieve diskInfo */ public JsonNode queryAgentDisk(String iPv4addr) { - ObjectMapper objectMapper = new ObjectMapper(); String params = String.format("agent_host_monitoring_disk{iPv4addr=\"%s\"}", iPv4addr); JsonNode result = query(params); + ObjectMapper objectMapper = new ObjectMapper(); if (result != null) { JsonNode agentDisksResult = result.get("data").get("result"); if (agentDisksResult.isArray() && !agentDisksResult.isEmpty()) { - JsonNode agentDisksMetric = agentDisksResult.get(0).get("metric"); - JsonNode agentDisksValue = agentDisksResult.get(0).get("value"); ObjectNode agentDiskInfo = objectMapper.createObjectNode(); + // metric + JsonNode agentDisksMetric = agentDisksResult.get(0).get("metric"); agentDiskInfo.put("hostname", agentDisksMetric.get("hostname").asText()); agentDiskInfo.put("iPv4addr", agentDisksMetric.get("iPv4addr").asText()); - LocalDateTime instant = Instant.ofEpochSecond( - agentDisksValue.get(0).asLong()) - .atZone(ZoneId.systemDefault()) - .toLocalDateTime(); - agentDiskInfo.put("time", instant.toString()); - ArrayNode tempDiskInfo = objectMapper.createArrayNode(); + // value + Long diskTotalSpace = 0L,diskFreeSpace = 0L; for (JsonNode agent : agentDisksResult) { - JsonNode agentDisk = agent.get("metric"); - ObjectNode temp = objectMapper.createObjectNode(); - temp.put("diskName", agentDisk.get("diskName").asText()); - temp.put("diskUsage", agentDisk.get("diskUsage").asText()); - temp.put("diskValue", agent.get("value").get(1).asLong()); - tempDiskInfo.add(temp); + if (Objects.equals(agent.get("metric").get("diskUsage").asText(), DISK_IDLE)) { + diskFreeSpace += agent.get("value").get(1).asLong(); + } else { + diskTotalSpace += agent.get("value").get(1).asLong(); + } } - agentDiskInfo.set("diskInfo", tempDiskInfo); + agentDiskInfo.put(DISK_TOTAL, diskTotalSpace); + agentDiskInfo.put(DISK_IDLE, diskFreeSpace); return agentDiskInfo; } } @@ -453,69 +512,76 @@ public JsonNode queryAgentDisk(String iPv4addr) { } /** - * query agent diskIO by ipv4 + * retrieve diskIO */ public JsonNode queryAgentDiskIO(String iPv4addr) { - ObjectMapper objectMapper = new ObjectMapper(); String params = String.format("agent_host_monitoring_diskIO{iPv4addr=\"%s\"}", iPv4addr); JsonNode result = query(params); + ObjectMapper objectMapper = new ObjectMapper(); if (result != null) { JsonNode agentDisksResult = result.get("data").get("result"); if (agentDisksResult.isArray() && !agentDisksResult.isEmpty()) { - JsonNode agentDisksValue = agentDisksResult.get(0).get("value"); - JsonNode agentDisksMetric = agentDisksResult.get(0).get("metric"); ObjectNode agentDiskIOInfo = objectMapper.createObjectNode(); + // metric + JsonNode agentDisksMetric = agentDisksResult.get(0).get("metric"); agentDiskIOInfo .put("hostname", agentDisksMetric.get("hostname").asText()) .put("iPv4addr", agentDisksMetric.get("iPv4addr").asText()); - LocalDateTime instant = Instant.ofEpochSecond( - agentDisksValue.get(0).asLong()) - .atZone(ZoneId.systemDefault()) - .toLocalDateTime(); - agentDiskIOInfo.put("time", instant.toString()); - ArrayNode tempDiskInfo = objectMapper.createArrayNode(); - for (JsonNode agent : agentDisksResult) { - JsonNode agentDisk = agent.get("metric"); - ObjectNode temp = objectMapper.createObjectNode(); - temp.put( - "physicalDiskName", - agentDisk.get("physicalDiskName").asText()); - temp.put("physicalDiskUsage", agentDisk.get("diskIO").asText()); - if (Objects.equals(agentDisk.get("diskIO").asText(), "physicalDiskWrite")) { - temp.put("physicalDiskWrite", agent.get("value").get(1).asLong()); + // value + long diskWrite = 0L; + long diskRead = 0L; + for (JsonNode disk : agentDisksResult) { + if (Objects.equals(disk.get("metric").get("diskIO").asText(), DISK_WRITE)) { + diskWrite += disk.get("value").get(1).asLong(); } else { - temp.put("physicalDiskRead", agent.get("value").get(1).asLong()); + diskRead += disk.get("value").get(1).asLong(); } - tempDiskInfo.add(temp); } - agentDiskIOInfo.set("diskIO", tempDiskInfo); + agentDiskIOInfo.put(DISK_WRITE,diskWrite); + agentDiskIOInfo.put(DISK_READ,diskRead); return agentDiskIOInfo; } } return objectMapper.createObjectNode(); } - /** - * get time step + * retrieve diskIO internal */ - public ArrayList getTimeStampsList(int step) { - // format - String currentTimeStr = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")); - String currentDateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - LocalDateTime currentDateTime = LocalDateTime.parse(currentDateStr + " " + currentTimeStr, formatter); - int roundedMinute = (currentDateTime.getMinute() / step) * step; - LocalDateTime roundedCurrentDateTime = - currentDateTime.withMinute(roundedMinute).withSecond(0).withNano(0); - // get 8 node - ArrayList timestamps = new ArrayList<>(); - ZoneId zid = ZoneId.systemDefault(); - for (int i = 0; i < 7; i++) { - LocalDateTime pastTime = roundedCurrentDateTime.minus(Duration.ofMinutes((long) step * i)); - long timestamp = pastTime.atZone(zid).toInstant().toEpochMilli() / 1000L; - timestamps.add(timestamp); + public JsonNode queryAgentDiskIO(String iPv4addr,String step) { + String params = String.format("agent_host_monitoring_diskIO{iPv4addr=\"%s\"}", iPv4addr); + ArrayList timeStampsList = ProxyUtils.getTimeStampsList(Integer.parseInt(step)); + JsonNode result = queryRange( + params, timeStampsList.get(timeStampsList.size() - 1), timeStampsList.get(0), ProxyUtils.Number2Param(Integer.parseInt(step))); + ObjectMapper objectMapper = new ObjectMapper(); + if (result != null) { + JsonNode agentDisksResult = result.get("data").get("result"); + if (agentDisksResult.isArray() && !agentDisksResult.isEmpty()) { + ObjectNode agentDiskIOInfo = objectMapper.createObjectNode(); + // metric + JsonNode agentDisksMetric = agentDisksResult.get(0).get("metric"); + agentDiskIOInfo + .put("hostname", agentDisksMetric.get("hostname").asText()) + .put("iPv4addr", agentDisksMetric.get("iPv4addr").asText()); + + // value + long[] diskWrite = new long[6]; + long[] diskRead = new long[6]; + for (JsonNode disk : agentDisksResult) { + if (Objects.equals(disk.get("metric").get("diskIO").asText(), DISK_WRITE)) { + for (int i = 0 ; i<6;i++) + diskWrite[i] += disk.get("values").get(i).get(1).asLong(); + } else { + for (int i = 0 ; i<6;i++) + diskRead[i] += disk.get("values").get(i).get(1).asLong(); + } + } + agentDiskIOInfo.set(DISK_WRITE,ProxyUtils.array2node(diskWrite)); + agentDiskIOInfo.set(DISK_READ,ProxyUtils.array2node(diskRead)); + return agentDiskIOInfo; + } } - return timestamps; + return objectMapper.createObjectNode(); } + } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/utils/ProxyUtils.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/utils/ProxyUtils.java new file mode 100644 index 00000000..6cd9b4f4 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/utils/ProxyUtils.java @@ -0,0 +1,102 @@ +package org.apache.bigtop.manager.server.utils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; + +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; + +public class ProxyUtils { + public static double getDoubleSafely(JsonNode parentNode, String key, int index) { + JsonNode listNode = parentNode.get(key); + if (listNode != null && listNode.isArray() && index < listNode.size()) + return listNode.get(index).asDouble(); + return 0.0; + } + public static Long getLongSafely(JsonNode parentNode, String key, int index) { + JsonNode listNode = parentNode.get(key); + if (listNode != null && listNode.isArray() && index < listNode.size()) + return listNode.get(index).asLong(); + return 0L; + } + public static JsonNode array2node(double[][] array,int num,int cores){ + ObjectMapper mapper = new ObjectMapper(); + double[] cache = new double[6]; + for(int i = 0; i< num;i++) + for(int j = 0;j<6;j++) + cache[j] += array[i][j]; + ArrayNode node = mapper.createArrayNode(); + // 数据排序为日期小在前,日期大在后 + for(int j = 0;j<6;j++) + node.add(cache[j] / cores); + return node; + } + public static JsonNode array2node(double[] array){ + ArrayNode node = new ObjectMapper().createArrayNode(); + for(int j = 0;j<6;j++) + node.add(array[j]); + return node; + } + public static JsonNode array2node(long[] array){ + ArrayNode node = new ObjectMapper().createArrayNode(); + for(int j = 0;j<6;j++) + node.add(array[j]); + return node; + } + public static JsonNode array2node(long[] array1,long[] array2){ + ArrayNode node = new ObjectMapper().createArrayNode(); + for(int j = 0;j<6;j++) + if(array2[j] <= 0) + node.add(0.0); + else + node.add((double) ( array2[j] - array1[j]) / array2[j]); + return node; + } + public static JsonNode array2node(long[][] array1,long[][] array2,int num){ + ObjectMapper mapper = new ObjectMapper(); + long[] cache1 = new long[6]; + long[] cache2 = new long[6]; + for(int i = 0; i< num;i++) { + for (int j = 0; j < 6; j++) { + cache1[j] += array1[i][j]; + cache2[j] += array2[i][j]; + } + } + ArrayNode node = mapper.createArrayNode(); + // The data is sorted with earlier dates coming first and later dates following. + for(int j = 0;j < 6;j++) + if(cache2[j] <= 0) + node.add(0.0); + else + node.add((double) (cache2[j] - cache1[j]) / cache2[j]); + return node; + } + public static ArrayList getTimeStampsList(int step) { + // format + String currentTimeStr = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")); + String currentDateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime currentDateTime = LocalDateTime.parse(currentDateStr + " " + currentTimeStr, formatter); + int roundedMinute = (currentDateTime.getMinute() / step) * step; + LocalDateTime roundedCurrentDateTime = + currentDateTime.withMinute(roundedMinute).withSecond(0).withNano(0); + // get 8 point + ArrayList timestamps = new ArrayList<>(); + ZoneId zid = ZoneId.systemDefault(); + for (int i = 0; i < 7; i++) { + LocalDateTime pastTime = roundedCurrentDateTime.minus(Duration.ofMinutes((long) step * i)); + long timestamp = pastTime.atZone(zid).toInstant().toEpochMilli() / 1000L; + timestamps.add(timestamp); + } + return timestamps; + } + public static String Number2Param(int step){ + return String.format("%sm",step); + } +}