From ac9120922079087fbcd75691cc0e6b1bef2b2246 Mon Sep 17 00:00:00 2001 From: Zhiguo Wu Date: Fri, 10 Jan 2025 23:34:46 +0800 Subject: [PATCH 1/2] BIGTOP-4324: Update agent cache files before job runs --- .../JobCacheServiceGrpcImpl.java} | 53 +++-- .../src/main/resources/proto/job_cache.proto | 36 ++++ .../JobCacheHelper.java} | 183 +++++++----------- .../server/command/job/AbstractJob.java | 26 +-- .../command/job/cluster/ClusterAddJob.java | 14 +- .../job/component/ComponentAddJob.java | 3 - .../server/command/job/host/HostAddJob.java | 2 - .../command/job/service/ServiceAddJob.java | 3 - .../job/service/ServiceConfigureJob.java | 3 - .../command/stage/CacheFileUpdateStage.java | 82 -------- 10 files changed, 155 insertions(+), 250 deletions(-) rename bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/{executor/CacheFileUpdateCommandExecutor.java => service/JobCacheServiceGrpcImpl.java} (52%) create mode 100644 bigtop-manager-grpc/src/main/resources/proto/job_cache.proto rename bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/{task/CacheFileUpdateTask.java => helper/JobCacheHelper.java} (53%) delete mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java diff --git a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java similarity index 52% rename from bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java rename to bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java index 74b014cf..b5b9891c 100644 --- a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/CacheFileUpdateCommandExecutor.java +++ b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/JobCacheServiceGrpcImpl.java @@ -16,24 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.bigtop.manager.agent.executor; +package org.apache.bigtop.manager.agent.service; import org.apache.bigtop.manager.common.constants.MessageConstants; import org.apache.bigtop.manager.common.message.entity.payload.CacheMessagePayload; import org.apache.bigtop.manager.common.utils.JsonUtils; import org.apache.bigtop.manager.common.utils.ProjectPathUtils; -import org.apache.bigtop.manager.grpc.generated.CommandType; - -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Component; +import org.apache.bigtop.manager.grpc.generated.JobCacheReply; +import org.apache.bigtop.manager.grpc.generated.JobCacheRequest; +import org.apache.bigtop.manager.grpc.generated.JobCacheServiceGrpc; +import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; +import net.devh.boot.grpc.server.service.GrpcService; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.text.MessageFormat; import static org.apache.bigtop.manager.common.constants.CacheFiles.CLUSTER_INFO; import static org.apache.bigtop.manager.common.constants.CacheFiles.COMPONENTS_INFO; @@ -43,41 +42,33 @@ import static org.apache.bigtop.manager.common.constants.CacheFiles.USERS_INFO; @Slf4j -@Component -@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) -public class CacheFileUpdateCommandExecutor extends AbstractCommandExecutor { - - @Override - public CommandType getCommandType() { - return CommandType.UPDATE_CACHE_FILES; - } +@GrpcService +public class JobCacheServiceGrpcImpl extends JobCacheServiceGrpc.JobCacheServiceImplBase { @Override - public void doExecute() { - CacheMessagePayload cacheMessagePayload = - JsonUtils.readFromString(commandRequest.getPayload(), CacheMessagePayload.class); + public void save(JobCacheRequest request, StreamObserver responseObserver) { + CacheMessagePayload payload = JsonUtils.readFromString(request.getPayload(), CacheMessagePayload.class); String cacheDir = ProjectPathUtils.getAgentCachePath(); Path p = Paths.get(cacheDir); if (!Files.exists(p)) { try { Files.createDirectories(p); } catch (Exception e) { - log.error("Create directory failed: {}", cacheDir, e); - commandReplyBuilder.setCode(MessageConstants.FAIL_CODE); - commandReplyBuilder.setResult( - MessageFormat.format("Create directory {0}, failed: {1}", cacheDir, e.getMessage())); - return; + responseObserver.onError(e); } } - JsonUtils.writeToFile(cacheDir + CONFIGURATIONS_INFO, cacheMessagePayload.getConfigurations()); - JsonUtils.writeToFile(cacheDir + HOSTS_INFO, cacheMessagePayload.getClusterHostInfo()); - JsonUtils.writeToFile(cacheDir + USERS_INFO, cacheMessagePayload.getUserInfo()); - JsonUtils.writeToFile(cacheDir + COMPONENTS_INFO, cacheMessagePayload.getComponentInfo()); - JsonUtils.writeToFile(cacheDir + REPOS_INFO, cacheMessagePayload.getRepoInfo()); - JsonUtils.writeToFile(cacheDir + CLUSTER_INFO, cacheMessagePayload.getClusterInfo()); + JsonUtils.writeToFile(cacheDir + CONFIGURATIONS_INFO, payload.getConfigurations()); + JsonUtils.writeToFile(cacheDir + HOSTS_INFO, payload.getClusterHostInfo()); + JsonUtils.writeToFile(cacheDir + USERS_INFO, payload.getUserInfo()); + JsonUtils.writeToFile(cacheDir + COMPONENTS_INFO, payload.getComponentInfo()); + JsonUtils.writeToFile(cacheDir + REPOS_INFO, payload.getRepoInfo()); + JsonUtils.writeToFile(cacheDir + CLUSTER_INFO, payload.getClusterInfo()); - commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE); - commandReplyBuilder.setResult("Successfully cached files"); + JobCacheReply reply = JobCacheReply.newBuilder() + .setCode(MessageConstants.SUCCESS_CODE) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); } } diff --git a/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto b/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto new file mode 100644 index 00000000..660bf153 --- /dev/null +++ b/bigtop-manager-grpc/src/main/resources/proto/job_cache.proto @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.bigtop.manager.grpc.generated"; +option java_outer_classname = "JobCacheProto"; + +service JobCacheService { + rpc save (JobCacheRequest) returns (JobCacheReply) {} +} + +message JobCacheRequest { + int64 job_id = 1; + string payload = 2; +} + +message JobCacheReply { + int32 code = 1; +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java similarity index 53% rename from bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java rename to bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java index 42468e4e..cde64e49 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/CacheFileUpdateTask.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java @@ -16,12 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.bigtop.manager.server.command.task; +package org.apache.bigtop.manager.server.command.helper; -import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.common.constants.MessageConstants; import org.apache.bigtop.manager.common.message.entity.payload.CacheMessagePayload; import org.apache.bigtop.manager.common.message.entity.pojo.ClusterInfo; -import org.apache.bigtop.manager.common.message.entity.pojo.ComponentInfo; import org.apache.bigtop.manager.common.message.entity.pojo.RepoInfo; import org.apache.bigtop.manager.common.utils.JsonUtils; import org.apache.bigtop.manager.dao.po.ClusterPO; @@ -35,8 +34,11 @@ import org.apache.bigtop.manager.dao.repository.HostDao; import org.apache.bigtop.manager.dao.repository.RepoDao; import org.apache.bigtop.manager.dao.repository.ServiceConfigDao; -import org.apache.bigtop.manager.grpc.generated.CommandRequest; -import org.apache.bigtop.manager.grpc.generated.CommandType; +import org.apache.bigtop.manager.grpc.generated.JobCacheReply; +import org.apache.bigtop.manager.grpc.generated.JobCacheRequest; +import org.apache.bigtop.manager.grpc.generated.JobCacheServiceGrpc; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.grpc.GrpcClient; import org.apache.bigtop.manager.server.holder.SpringContextHolder; import org.apache.bigtop.manager.server.model.converter.RepoConverter; import org.apache.bigtop.manager.server.model.dto.ServiceDTO; @@ -49,59 +51,72 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.apache.bigtop.manager.common.constants.Constants.ALL_HOST_KEY; -public class CacheFileUpdateTask extends AbstractTask { +public class JobCacheHelper { - private ClusterDao clusterDao; - private ServiceConfigDao serviceConfigDao; - private RepoDao repoDao; - private HostDao hostDao; - private ComponentDao componentDao; + private static ClusterDao clusterDao; + private static ServiceConfigDao serviceConfigDao; + private static RepoDao repoDao; + private static HostDao hostDao; + private static ComponentDao componentDao; - private ClusterInfo clusterInfo; - private Map componentInfoMap; - private Map> serviceConfigMap; - private Map> hostMap; - private List repoList; - private Map userMap; + private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); - public CacheFileUpdateTask(TaskContext taskContext) { - super(taskContext); - } - - @Override - protected void injectBeans() { - super.injectBeans(); + private static void initialize() { + clusterDao = SpringContextHolder.getBean(ClusterDao.class); + serviceConfigDao = SpringContextHolder.getBean(ServiceConfigDao.class); + repoDao = SpringContextHolder.getBean(RepoDao.class); + hostDao = SpringContextHolder.getBean(HostDao.class); + componentDao = SpringContextHolder.getBean(ComponentDao.class); - this.clusterDao = SpringContextHolder.getBean(ClusterDao.class); - this.serviceConfigDao = SpringContextHolder.getBean(ServiceConfigDao.class); - this.repoDao = SpringContextHolder.getBean(RepoDao.class); - this.hostDao = SpringContextHolder.getBean(HostDao.class); - this.componentDao = SpringContextHolder.getBean(ComponentDao.class); + INITIALIZED.set(true); } - @Override - public void beforeRun() { - super.beforeRun(); + public static void saveJobCache(Long clusterId, Long jobId, List hostnames) { + CacheMessagePayload payload = genPayload(clusterId); + JobCacheRequest request = JobCacheRequest.newBuilder() + .setJobId(jobId) + .setPayload(JsonUtils.writeAsString(payload)) + .build(); + List hostPOList = hostDao.findAllByHostnames(hostnames); + List> futures = new ArrayList<>(); + for (HostPO hostPO : hostPOList) { + futures.add(CompletableFuture.supplyAsync(() -> { + JobCacheServiceGrpc.JobCacheServiceBlockingStub stub = GrpcClient.getBlockingStub( + hostPO.getHostname(), + hostPO.getGrpcPort(), + JobCacheServiceGrpc.JobCacheServiceBlockingStub.class); + JobCacheReply reply = stub.save(request); + return reply != null && reply.getCode() == MessageConstants.SUCCESS_CODE; + })); + } - genCaches(); + List results = futures.stream() + .map((future) -> { + try { + return future.get(); + } catch (Exception e) { + return false; + } + }) + .toList(); + + boolean allSuccess = results.stream().allMatch(Boolean::booleanValue); + if (!allSuccess) { + throw new ServerException("Failed to save job cache"); + } } - private void genCaches() { - if (taskContext.getClusterId() == null) { - genEmptyCaches(); - } else { - genFullCaches(); + private static CacheMessagePayload genPayload(Long clusterId) { + if (!INITIALIZED.get()) { + initialize(); } - } - @SuppressWarnings("unchecked") - private void genFullCaches() { - Long clusterId = taskContext.getClusterId(); - List hostnames = (List) taskContext.getProperties().get("hostnames"); ClusterPO clusterPO = clusterDao.findById(clusterId); ComponentQuery componentQuery = @@ -110,14 +125,14 @@ private void genFullCaches() { List serviceConfigPOList = serviceConfigDao.findByClusterId(clusterPO.getId()); List componentPOList = componentDao.findByQuery(componentQuery); List repoPOList = repoDao.findAll(); - List hostPOList = hostDao.findAllByHostnames(hostnames); + List hostPOList = hostDao.findAll(); - clusterInfo = new ClusterInfo(); + ClusterInfo clusterInfo = new ClusterInfo(); clusterInfo.setName(clusterPO.getName()); clusterInfo.setUserGroup(clusterPO.getUserGroup()); clusterInfo.setRootDir(clusterPO.getRootDir()); - serviceConfigMap = new HashMap<>(); + Map> serviceConfigMap = new HashMap<>(); for (ServiceConfigPO serviceConfigPO : serviceConfigPOList) { List> properties = JsonUtils.readFromString(serviceConfigPO.getPropertiesJson()); Map kvMap = properties.stream() @@ -134,7 +149,7 @@ private void genFullCaches() { } } - hostMap = new HashMap<>(); + Map> hostMap = new HashMap<>(); componentPOList.forEach(x -> { if (hostMap.containsKey(x.getName())) { hostMap.get(x.getName()).add(x.getHostname()); @@ -149,81 +164,25 @@ private void genFullCaches() { Set hostNameSet = hostPOList.stream().map(HostPO::getHostname).collect(Collectors.toSet()); hostMap.put(ALL_HOST_KEY, hostNameSet); - repoList = new ArrayList<>(); + List repoList = new ArrayList<>(); repoPOList.forEach(repoPO -> { RepoInfo repoInfo = RepoConverter.INSTANCE.fromPO2Message(repoPO); repoList.add(repoInfo); }); - userMap = new HashMap<>(); + Map userMap = new HashMap<>(); for (StackDTO stackDTO : StackUtils.getAllStacks()) { for (ServiceDTO serviceDTO : StackUtils.getServiceDTOList(stackDTO)) { userMap.put(serviceDTO.getName(), serviceDTO.getUser()); } } - } - - @SuppressWarnings("unchecked") - private void genEmptyCaches() { - List hostnames = (List) taskContext.getProperties().get("hostnames"); - - List repoPOList = repoDao.findAll(); - List hostPOList = hostDao.findAllByHostnames(hostnames); - - componentInfoMap = new HashMap<>(); - serviceConfigMap = new HashMap<>(); - - clusterInfo = new ClusterInfo(); - clusterInfo.setUserGroup(taskContext.getUserGroup()); - clusterInfo.setRootDir(taskContext.getRootDir()); - - hostMap = new HashMap<>(); - Set hostNameSet = hostPOList.stream().map(HostPO::getHostname).collect(Collectors.toSet()); - hostMap.put(ALL_HOST_KEY, hostNameSet); - - repoList = new ArrayList<>(); - repoPOList.forEach(repoPO -> { - RepoInfo repoInfo = RepoConverter.INSTANCE.fromPO2Message(repoPO); - repoList.add(repoInfo); - }); - - userMap = new HashMap<>(); - for (StackDTO stackDTO : StackUtils.getAllStacks()) { - for (ServiceDTO serviceDTO : StackUtils.getServiceDTOList(stackDTO)) { - userMap.put(serviceDTO.getName(), serviceDTO.getUser()); - } - } - } - - @Override - protected Command getCommand() { - return Command.CUSTOM; - } - - @Override - protected String getCustomCommand() { - return "update_cache_files"; - } - - @Override - protected CommandRequest getCommandRequest() { - CacheMessagePayload messagePayload = new CacheMessagePayload(); - messagePayload.setClusterInfo(clusterInfo); - messagePayload.setConfigurations(serviceConfigMap); - messagePayload.setClusterHostInfo(hostMap); - messagePayload.setRepoInfo(repoList); - messagePayload.setUserInfo(userMap); - messagePayload.setComponentInfo(componentInfoMap); - - CommandRequest.Builder builder = CommandRequest.newBuilder(); - builder.setType(CommandType.UPDATE_CACHE_FILES); - builder.setPayload(JsonUtils.writeAsString(messagePayload)); - - return builder.build(); - } - @Override - public String getName() { - return "Update cache files on " + taskContext.getHostname(); + CacheMessagePayload payload = new CacheMessagePayload(); + payload.setClusterInfo(clusterInfo); + payload.setConfigurations(serviceConfigMap); + payload.setClusterHostInfo(hostMap); + payload.setRepoInfo(repoList); + payload.setUserInfo(userMap); + return payload; } } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java index 1f72ba5d..fb096a13 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java @@ -28,7 +28,7 @@ import org.apache.bigtop.manager.dao.repository.JobDao; import org.apache.bigtop.manager.dao.repository.StageDao; import org.apache.bigtop.manager.dao.repository.TaskDao; -import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage; +import org.apache.bigtop.manager.server.command.helper.JobCacheHelper; import org.apache.bigtop.manager.server.command.stage.Stage; import org.apache.bigtop.manager.server.command.stage.StageContext; import org.apache.bigtop.manager.server.command.task.Task; @@ -84,11 +84,6 @@ protected void beforeCreateStages() { protected abstract void createStages(); - protected void createCacheStage() { - StageContext stageContext = StageContext.fromCommandDTO(jobContext.getCommandDTO()); - stages.add(new CacheFileUpdateStage(stageContext)); - } - @Override public void beforeRun() { jobPO.setState(JobState.PROCESSING.getName()); @@ -100,8 +95,18 @@ public void run() { boolean success = true; try { + // Persist job state and required data. beforeRun(); + // Send job cache to agents + List hostnames = stages.stream() + .map(Stage::getStageContext) + .map(StageContext::getHostnames) + .flatMap(List::stream) + .distinct() + .toList(); + JobCacheHelper.saveJobCache(clusterPO.getId(), jobPO.getId(), hostnames); + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(stages); while (!queue.isEmpty()) { Stage stage = queue.poll(); @@ -152,12 +157,9 @@ public void onFailure() { } } } - if (!taskPOList.isEmpty()) { - taskDao.partialUpdateByIds(taskPOList); - } - if (!stagePOList.isEmpty()) { - stageDao.partialUpdateByIds(stagePOList); - } + + taskDao.partialUpdateByIds(taskPOList); + stageDao.partialUpdateByIds(stagePOList); jobDao.partialUpdateById(jobPO); } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java index f8650456..0661b322 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/cluster/ClusterAddJob.java @@ -23,7 +23,6 @@ import org.apache.bigtop.manager.dao.po.TaskPO; import org.apache.bigtop.manager.server.command.job.AbstractJob; import org.apache.bigtop.manager.server.command.job.JobContext; -import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage; import org.apache.bigtop.manager.server.command.stage.HostCheckStage; import org.apache.bigtop.manager.server.command.stage.SetupJdkStage; import org.apache.bigtop.manager.server.command.stage.Stage; @@ -54,11 +53,22 @@ protected void injectBeans() { hostService = SpringContextHolder.getBean(HostService.class); } + @Override + protected void beforeCreateStages() { + super.beforeCreateStages(); + + if (jobContext.getRetryFlag()) { + // Cluster already created, but command still doesn't have cluster id + // So we need to find the cluster by name + String clusterName = jobContext.getCommandDTO().getClusterCommand().getName(); + clusterPO = clusterDao.findByName(clusterName); + } + } + @Override protected void createStages() { StageContext stageContext = StageContext.fromCommandDTO(jobContext.getCommandDTO()); stages.add(new HostCheckStage(stageContext)); - stages.add(new CacheFileUpdateStage(stageContext)); stages.add(new SetupJdkStage(stageContext)); } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java index c95fd60b..99a3fd01 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentAddJob.java @@ -49,9 +49,6 @@ protected void injectBeans() { @Override protected void createStages() { - // Update cache files - super.createCacheStage(); - CommandDTO commandDTO = jobContext.getCommandDTO(); Map> componentHostsMap = getComponentHostsMap(); diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java index a10f240a..ba045d31 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/host/HostAddJob.java @@ -19,7 +19,6 @@ package org.apache.bigtop.manager.server.command.job.host; import org.apache.bigtop.manager.server.command.job.JobContext; -import org.apache.bigtop.manager.server.command.stage.CacheFileUpdateStage; import org.apache.bigtop.manager.server.command.stage.HostCheckStage; import org.apache.bigtop.manager.server.command.stage.SetupJdkStage; import org.apache.bigtop.manager.server.command.stage.StageContext; @@ -50,7 +49,6 @@ protected void injectBeans() { protected void createStages() { StageContext stageContext = StageContext.fromCommandDTO(jobContext.getCommandDTO()); stages.add(new HostCheckStage(stageContext)); - stages.add(new CacheFileUpdateStage(stageContext)); stages.add(new SetupJdkStage(stageContext)); } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java index 690f3542..fb4ba777 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java @@ -58,9 +58,6 @@ protected void injectBeans() { @Override protected void createStages() { - // Update cache files - super.createCacheStage(); - CommandDTO commandDTO = jobContext.getCommandDTO(); Map> componentHostsMap = getComponentHostsMap(); diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java index f6325685..93c61bba 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceConfigureJob.java @@ -34,9 +34,6 @@ public ServiceConfigureJob(JobContext jobContext) { @Override protected void createStages() { - // Update cache files - super.createCacheStage(); - CommandDTO commandDTO = jobContext.getCommandDTO(); Map> componentHostsMap = getComponentHostsMap(); diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java deleted file mode 100644 index 88c57081..00000000 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/CacheFileUpdateStage.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bigtop.manager.server.command.stage; - -import org.apache.bigtop.manager.dao.po.HostPO; -import org.apache.bigtop.manager.server.command.task.CacheFileUpdateTask; -import org.apache.bigtop.manager.server.command.task.Task; -import org.apache.bigtop.manager.server.command.task.TaskContext; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class CacheFileUpdateStage extends AbstractStage { - - public CacheFileUpdateStage(StageContext stageContext) { - super(stageContext); - } - - @Override - protected void injectBeans() { - super.injectBeans(); - } - - @Override - protected void beforeCreateTasks() { - List hostnames = new ArrayList<>(); - - if (stageContext.getClusterId() == null) { - hostnames.addAll(stageContext.getHostnames() == null ? List.of() : stageContext.getHostnames()); - } else { - hostnames.addAll(stageContext.getHostnames() == null ? List.of() : stageContext.getHostnames()); - hostnames.addAll(hostDao.findAllByClusterId(stageContext.getClusterId()).stream() - .map(HostPO::getHostname) - .toList()); - } - - stageContext.setHostnames(hostnames); - } - - @Override - protected Task createTask(String hostname) { - TaskContext taskContext = new TaskContext(); - taskContext.setHostname(hostname); - taskContext.setClusterId(stageContext.getClusterId()); - taskContext.setClusterName(stageContext.getClusterName()); - taskContext.setUserGroup(stageContext.getUserGroup()); - taskContext.setRootDir(stageContext.getRootDir()); - taskContext.setServiceName("cluster"); - taskContext.setServiceUser("root"); - taskContext.setComponentName("agent"); - taskContext.setComponentDisplayName("Agent"); - - Map properties = new HashMap<>(); - properties.put("hostnames", stageContext.getHostnames()); - taskContext.setProperties(properties); - - return new CacheFileUpdateTask(taskContext); - } - - @Override - public String getName() { - return "Update cache files"; - } -} From 3f9ded615d88c953547c1489506863ecd9ec2e9b Mon Sep 17 00:00:00 2001 From: Zhiguo Wu Date: Fri, 10 Jan 2025 23:41:29 +0800 Subject: [PATCH 2/2] rename --- .../bigtop/manager/server/command/helper/JobCacheHelper.java | 4 ++-- .../apache/bigtop/manager/server/command/job/AbstractJob.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java index cde64e49..9bea2e67 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java @@ -77,7 +77,7 @@ private static void initialize() { INITIALIZED.set(true); } - public static void saveJobCache(Long clusterId, Long jobId, List hostnames) { + public static void sendJobCache(Long clusterId, Long jobId, List hostnames) { CacheMessagePayload payload = genPayload(clusterId); JobCacheRequest request = JobCacheRequest.newBuilder() .setJobId(jobId) @@ -108,7 +108,7 @@ public static void saveJobCache(Long clusterId, Long jobId, List hostnam boolean allSuccess = results.stream().allMatch(Boolean::booleanValue); if (!allSuccess) { - throw new ServerException("Failed to save job cache"); + throw new ServerException("Failed to send job cache"); } } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java index fb096a13..02f07ecf 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java @@ -105,7 +105,7 @@ public void run() { .flatMap(List::stream) .distinct() .toList(); - JobCacheHelper.saveJobCache(clusterPO.getId(), jobPO.getId(), hostnames); + JobCacheHelper.sendJobCache(clusterPO.getId(), jobPO.getId(), hostnames); LinkedBlockingQueue queue = new LinkedBlockingQueue<>(stages); while (!queue.isEmpty()) {