diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/AgentApplication.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/AgentApplication.java index 39acaf52..3f965a3f 100644 --- a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/AgentApplication.java +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/AgentApplication.java @@ -19,7 +19,6 @@ package org.apache.bigtop.manager.agent; import org.apache.bigtop.manager.agent.monitoring.AgentHostMonitoring; -import org.apache.bigtop.manager.agent.monitoring.ZookeeperHealthyMonitoring; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; @@ -54,9 +53,4 @@ public static void main(String[] args) { @Qualifier("memMultiGauge") public MultiGauge memMultiGauge(MeterRegistry meterRegistry) { return AgentHostMonitoring.newMemMultiGauge(meterRegistry); } - - @Bean - @Qualifier("zookeeperMultiGauge") public MultiGauge zookeeperMultiGauge(MeterRegistry meterRegistry) { - return ZookeeperHealthyMonitoring.registerZookeeperHealthyGauge(meterRegistry); - } } diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/metrics/MetricsCollector.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/metrics/MetricsCollector.java index 7028af56..e1ef7c27 100644 --- a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/metrics/MetricsCollector.java +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/metrics/MetricsCollector.java @@ -19,7 +19,6 @@ package org.apache.bigtop.manager.agent.metrics; import org.apache.bigtop.manager.agent.monitoring.AgentHostMonitoring; -import org.apache.bigtop.manager.agent.monitoring.ZookeeperHealthyMonitoring; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Async; @@ -44,9 +43,6 @@ public class MetricsCollector { @Resource @Qualifier("cpuMultiGauge") private MultiGauge cpuMultiGauge; - @Resource - @Qualifier("zookeeperMultiGauge") private MultiGauge zookeeperMultiGauge; - @Async @Scheduled(cron = "*/10 * * * * ?") public void collect() { @@ -58,6 +54,5 @@ private void scrape() { AgentHostMonitoring.diskMultiGaugeUpdateData(diskMultiGauge); AgentHostMonitoring.memMultiGaugeUpdateData(memMultiGauge); AgentHostMonitoring.cpuMultiGaugeUpdateData(cpuMultiGauge); - ZookeeperHealthyMonitoring.zookeeperUpdateStatus(zookeeperMultiGauge); } } diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/monitoring/ZookeeperHealthyMonitoring.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/monitoring/ZookeeperHealthyMonitoring.java deleted file mode 100644 index a8d87c20..00000000 --- a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/monitoring/ZookeeperHealthyMonitoring.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bigtop.manager.agent.monitoring; - -import org.apache.bigtop.manager.common.constants.CacheFiles; -import org.apache.bigtop.manager.common.constants.Constants; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.MultiGauge; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Tags; -import lombok.extern.slf4j.Slf4j; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import static java.nio.charset.StandardCharsets.UTF_8; - -@Slf4j -public class ZookeeperHealthyMonitoring { - - private static final int STATUS_FOLLOWER = 2; - private static final int STATUS_LEADER = 1; - private static final int ZOOKEEPER_UNHEALTHY = 0; - - private static final String ZOOKEEPER_HEALTHY_MONITORING_NAME = "zookeeper_monitoring"; - - public static void zookeeperUpdateStatus(MultiGauge multiGauge) { - Socket sock = null; - BufferedReader reader = null; - try { - JsonNode hostInfo = AgentHostMonitoring.getHostInfo().get(AgentHostMonitoring.AGENT_BASE_INFO); - String hostName = hostInfo.get("hostname").asText(); - String ipv4 = hostInfo.get("iPv4addr").asText(); - JsonNode hostComponent = getHostComponent(); - if (null == hostComponent || hostComponent.isEmpty()) return; - Properties zooCFG = getZooCFG(hostComponent); - String zookeeperPort = zooCFG.getProperty("clientPort", "2181"); - InetSocketAddress hostAddress = new InetSocketAddress(hostName, Integer.parseInt(zookeeperPort)); - sock = new Socket(); - sock.connect(hostAddress, 3000); - sock.setSoTimeout(3000); - OutputStream outStream = sock.getOutputStream(); - outStream.write("srvr".getBytes(UTF_8)); - outStream.flush(); - sock.shutdownOutput(); - reader = new BufferedReader(new InputStreamReader(sock.getInputStream())); - List tags = new ArrayList<>(); - tags.add(Tag.of("host_name", hostName)); - tags.add(Tag.of("ipv4", ipv4)); - tags.add(Tag.of("client_port", zookeeperPort)); - List lines = reader.lines().toList(); - int final_status = ZOOKEEPER_UNHEALTHY; - for (String line : lines) { - if (line.contains("Mode")) { - String mode = line.split(":")[1].trim(); - final_status = mode.contains("leader") ? STATUS_LEADER : STATUS_FOLLOWER; - tags.add(Tag.of("mode", mode)); - } - if (line.contains("version")) { - tags.add(Tag.of("version", line.split(":")[1].trim())); - } - } - ArrayList> rows = new ArrayList<>(); - rows.add(MultiGauge.Row.of(Tags.of(tags), final_status)); - multiGauge.register(rows, true); - } catch (Exception e) { - log.error("Exception while executing four letter word: srvr", e); - } finally { - try { - if (sock != null) { - sock.close(); - } - if (reader != null) { - reader.close(); - } - } catch (IOException ignored) { - } - } - } - - public static MultiGauge registerZookeeperHealthyGauge(MeterRegistry registry) { - return MultiGauge.builder(ZOOKEEPER_HEALTHY_MONITORING_NAME) - .description("BigTop Manager Zookeeper Healthy Monitoring, 0:unhealthy, 1:leader, 2:follower") - .baseUnit("healthy") - .register(registry); - } - - private static Properties getZooCFG(JsonNode hostComponent) { - try { - String root = hostComponent.get("root").asText("/opt"); - String stackName = hostComponent.get("stackName").asText("bigtop"); - String stackVerion = hostComponent.get("stackVersion").asText("3.3.0"); - String zookeeperCFG = "usr/lib/zookeeper/conf/zoo.cfg"; - String cfgFullPath = String.format("%s/%s/%s/%s", root, stackName, stackVerion, zookeeperCFG); - FileInputStream fileInputStream = new FileInputStream(cfgFullPath); - Properties properties = new Properties(); - properties.load(fileInputStream); - return properties; - } catch (IOException e) { - log.error("get Zookeeper Config Error", e); - throw new RuntimeException(e); - } - } - - private static JsonNode getHostComponent() { - String cacheDir = Constants.STACK_CACHE_DIR; - ObjectMapper objectMapper = new ObjectMapper(); - try { - String clusterInfo = cacheDir + CacheFiles.CLUSTER_INFO; - String componentInfo = cacheDir + CacheFiles.COMPONENTS_INFO; - File cluster = new File(clusterInfo); - File component = new File(componentInfo); - if (!cluster.exists() || !component.exists()) return null; - JsonNode clusterJson = objectMapper.readTree(cluster); - JsonNode componentJson = objectMapper.readTree(component); - ObjectNode zookeeperServer = (ObjectNode) componentJson.get("zookeeper_server"); - if (null == zookeeperServer || zookeeperServer.isEmpty()) return null; - ((ObjectNode) clusterJson).setAll(zookeeperServer); - return clusterJson; - } catch (IOException e) { - log.error("get cached cluster info error: ", e); - } - return null; - } -} diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/ComponentStatusServiceGrpcImpl.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/ComponentStatusServiceGrpcImpl.java new file mode 100644 index 00000000..f2cba36b --- /dev/null +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/ComponentStatusServiceGrpcImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.agent.service; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.common.message.entity.payload.CommandPayload; +import org.apache.bigtop.manager.common.message.entity.pojo.ScriptInfo; +import org.apache.bigtop.manager.common.shell.ShellResult; +import org.apache.bigtop.manager.common.utils.JsonUtils; +import org.apache.bigtop.manager.grpc.generated.ComponentStatusReply; +import org.apache.bigtop.manager.grpc.generated.ComponentStatusRequest; +import org.apache.bigtop.manager.grpc.generated.ComponentStatusServiceGrpc; +import org.apache.bigtop.manager.stack.core.executor.StackExecutor; + +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import net.devh.boot.grpc.server.service.GrpcService; + +@Slf4j +@GrpcService +public class ComponentStatusServiceGrpcImpl extends ComponentStatusServiceGrpc.ComponentStatusServiceImplBase { + + @Override + public void getComponentStatus( + ComponentStatusRequest request, StreamObserver responseObserver) { + + try { + CommandPayload commandPayload = new CommandPayload(); + commandPayload.setCommand(Command.STATUS); + commandPayload.setServiceName(request.getServiceName()); + commandPayload.setComponentName(request.getComponentName()); + commandPayload.setCommandScript(JsonUtils.readFromString(request.getCommandScript(), ScriptInfo.class)); + commandPayload.setRoot(request.getRoot()); + commandPayload.setStackName(request.getStackName()); + commandPayload.setStackVersion(request.getStackVersion()); + ShellResult shellResult = StackExecutor.execute(commandPayload); + + ComponentStatusReply reply = ComponentStatusReply.newBuilder() + .setStatus(shellResult.getExitCode()) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Exception e) { + log.error("Error getting component status", e); + Status status = Status.UNKNOWN.withDescription(e.getMessage()); + responseObserver.onError(status.asRuntimeException()); + } + } +} diff --git a/bigtop-manager-grpc/src/main/resources/proto/component_status.proto b/bigtop-manager-grpc/src/main/resources/proto/component_status.proto new file mode 100644 index 00000000..06799da2 --- /dev/null +++ b/bigtop-manager-grpc/src/main/resources/proto/component_status.proto @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.bigtop.manager.grpc.generated"; +option java_outer_classname = "ComponentStatusProto"; + +service ComponentStatusService { + rpc GetComponentStatus (ComponentStatusRequest) returns (ComponentStatusReply) {} +} + +message ComponentStatusRequest { + string service_name = 1; + string component_name = 2; + string command_script = 3; + // TODO Unnecessary, should be removed in the future + string root = 4; + string stack_name = 5; + string stack_version = 6; +} + +message ComponentStatusReply { + int32 status = 1; +} \ No newline at end of file diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java index f759e16a..30acc430 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java @@ -34,7 +34,6 @@ import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Constructor; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -63,7 +62,8 @@ public static Boolean isChannelAlive(String host) { @SuppressWarnings("unchecked") public static > T getBlockingStub(String host, Class clazz) { - Map> innerMap = BLOCKING_STUBS.computeIfAbsent(host, k -> new HashMap<>()); + Map> innerMap = + BLOCKING_STUBS.computeIfAbsent(host, k -> new ConcurrentHashMap<>()); return (T) innerMap.computeIfAbsent(clazz.getName(), k -> { T instance = T.newStub(getFactory(clazz), getChannel(host)); log.info("Instance: {} created.", k); @@ -73,7 +73,7 @@ public static > T getBlockingStub(String host, @SuppressWarnings("unchecked") public static > T getAsyncStub(String host, Class clazz) { - Map> innerMap = ASYNC_STUBS.computeIfAbsent(host, k -> new HashMap<>()); + Map> innerMap = ASYNC_STUBS.computeIfAbsent(host, k -> new ConcurrentHashMap<>()); return (T) innerMap.computeIfAbsent(clazz.getName(), k -> { T instance = T.newStub(getFactory(clazz), getChannel(host)); log.info("Instance: {} created.", k); @@ -83,7 +83,8 @@ public static > T getAsyncStub(String host, Class @SuppressWarnings("unchecked") public static > T getFutureStub(String host, Class clazz) { - Map> innerMap = FUTURE_STUBS.computeIfAbsent(host, k -> new HashMap<>()); + Map> innerMap = + FUTURE_STUBS.computeIfAbsent(host, k -> new ConcurrentHashMap<>()); return (T) innerMap.computeIfAbsent(clazz.getName(), k -> { T instance = T.newStub(getFactory(clazz), getChannel(host)); log.info("Instance: {} created.", k); diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/ComponentStatusScheduler.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/ComponentStatusScheduler.java new file mode 100644 index 00000000..3bd0b6f4 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/ComponentStatusScheduler.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.scheduler; + +import org.apache.bigtop.manager.common.enums.MaintainState; +import org.apache.bigtop.manager.dao.entity.Cluster; +import org.apache.bigtop.manager.dao.entity.Component; +import org.apache.bigtop.manager.dao.entity.Host; +import org.apache.bigtop.manager.dao.entity.HostComponent; +import org.apache.bigtop.manager.dao.entity.Service; +import org.apache.bigtop.manager.dao.entity.Stack; +import org.apache.bigtop.manager.dao.repository.HostComponentRepository; +import org.apache.bigtop.manager.grpc.generated.ComponentStatusReply; +import org.apache.bigtop.manager.grpc.generated.ComponentStatusRequest; +import org.apache.bigtop.manager.grpc.generated.ComponentStatusServiceGrpc; +import org.apache.bigtop.manager.server.grpc.GrpcClient; + +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; + +import lombok.extern.slf4j.Slf4j; + +import jakarta.annotation.Resource; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Slf4j +@org.springframework.stereotype.Component +public class ComponentStatusScheduler { + + @Resource + private HostComponentRepository hostComponentRepository; + + @Async + @Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS) + public void execute() { + List hostComponentList = hostComponentRepository.findAll(); + for (HostComponent hostComponent : hostComponentList) { + // Only check services which should be in running + if (!List.of(MaintainState.STARTED, MaintainState.STOPPED).contains(hostComponent.getState())) { + continue; + } + + Host host = hostComponent.getHost(); + Component component = hostComponent.getComponent(); + Service service = component.getService(); + Cluster cluster = host.getCluster(); + Stack stack = cluster.getStack(); + + ComponentStatusRequest request = ComponentStatusRequest.newBuilder() + .setServiceName(service.getServiceName()) + .setComponentName(component.getComponentName()) + .setCommandScript(component.getCommandScript()) + .setRoot(cluster.getRoot()) + .setStackName(stack.getStackName()) + .setStackVersion(stack.getStackVersion()) + .build(); + ComponentStatusServiceGrpc.ComponentStatusServiceBlockingStub blockingStub = GrpcClient.getBlockingStub( + host.getHostname(), ComponentStatusServiceGrpc.ComponentStatusServiceBlockingStub.class); + ComponentStatusReply reply = blockingStub.getComponentStatus(request); + + // Status 0 means the service is running + if (reply.getStatus() == 0 && hostComponent.getState() == MaintainState.STOPPED) { + hostComponent.setState(MaintainState.STARTED); + hostComponentRepository.save(hostComponent); + } + + if (reply.getStatus() != 0 && hostComponent.getState() == MaintainState.STARTED) { + hostComponent.setState(MaintainState.STOPPED); + hostComponentRepository.save(hostComponent); + } + } + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java index 99442468..6561459c 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java @@ -34,6 +34,7 @@ import jakarta.annotation.Resource; import java.util.List; +import java.util.concurrent.TimeUnit; @Slf4j @Component @@ -43,7 +44,7 @@ public class HostInfoScheduler { private HostRepository hostRepository; @Async - @Scheduled(cron = "0/30 * * * * ? ") + @Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS) public void execute() { List hosts = hostRepository.findAll(); for (Host host : hosts) { diff --git a/bigtop-manager-ui/src/pages/service/index.vue b/bigtop-manager-ui/src/pages/service/index.vue index 3563bafa..c1c1d98c 100644 --- a/bigtop-manager-ui/src/pages/service/index.vue +++ b/bigtop-manager-ui/src/pages/service/index.vue @@ -57,7 +57,7 @@ Started: '#52c41a', Maintained: '#d9d9d9', Uninstalled: '#f0f964', - Stopped: 'ff4d4f' + Stopped: '#ff4d4f' } const links = [ @@ -153,6 +153,7 @@ onMounted(() => { initServiceMeta() + componentStore.resumeIntervalFn() }) watch( diff --git a/bigtop-manager-ui/src/store/component/index.ts b/bigtop-manager-ui/src/store/component/index.ts index 81e79f86..c301f17f 100644 --- a/bigtop-manager-ui/src/store/component/index.ts +++ b/bigtop-manager-ui/src/store/component/index.ts @@ -18,10 +18,12 @@ */ import { defineStore, storeToRefs } from 'pinia' -import { shallowRef, watch } from 'vue' +import { ref, shallowRef, watch } from 'vue' import { useClusterStore } from '@/store/cluster' import { HostComponentVO } from '@/api/component/types.ts' import { getHostComponents } from '@/api/component' +import { Pausable, useIntervalFn } from '@vueuse/core' +import { MONITOR_SCHEDULE_INTERVAL } from '@/utils/constant.ts' export const useComponentStore = defineStore( 'component', @@ -29,6 +31,7 @@ export const useComponentStore = defineStore( const clusterStore = useClusterStore() const { clusterId } = storeToRefs(clusterStore) const hostComponents = shallowRef([]) + const intervalFn = ref() watch(clusterId, async () => { await loadHostComponents() @@ -38,7 +41,23 @@ export const useComponentStore = defineStore( hostComponents.value = await getHostComponents(clusterId.value) } - return { hostComponents, loadHostComponents } + intervalFn.value = useIntervalFn( + async () => { + await loadHostComponents() + }, + MONITOR_SCHEDULE_INTERVAL, + { immediate: false, immediateCallback: true } + ) + + const resumeIntervalFn = () => intervalFn.value?.resume() + const pauseIntervalFn = () => intervalFn.value?.pause() + + return { + hostComponents, + loadHostComponents, + resumeIntervalFn, + pauseIntervalFn + } }, { persist: false } )