From 277a6eb5aa75874eb8119400f124044465f55971 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Tue, 22 Apr 2025 13:28:44 -0700 Subject: [PATCH 1/3] pin xstreams version, 1.4.21 banned at LI for security reason --- helix-admin-webapp/pom.xml | 2 +- helix-rest/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/helix-admin-webapp/pom.xml b/helix-admin-webapp/pom.xml index 0bd098f2f0..31502c7367 100644 --- a/helix-admin-webapp/pom.xml +++ b/helix-admin-webapp/pom.xml @@ -90,7 +90,7 @@ com.thoughtworks.xstream xstream - 1.4.21 + 1.4.19 com.fasterxml.jackson.core diff --git a/helix-rest/pom.xml b/helix-rest/pom.xml index 24ddf0d621..e13a8c2c67 100644 --- a/helix-rest/pom.xml +++ b/helix-rest/pom.xml @@ -121,7 +121,7 @@ com.thoughtworks.xstream xstream - 1.4.21 + 1.4.19 com.fasterxml.jackson.core From d59e9543df1329b30ce65cd95e01a7746e347c46 Mon Sep 17 00:00:00 2001 From: Anubhav Agarwal Date: Thu, 27 Mar 2025 11:40:31 +0530 Subject: [PATCH 2/3] Add check for customized resources in isEvacuatedFinished - Add isInstanceDrained method in HelixAdmin - Expose the method via instance update rest end point - Change the conditional checks order in isEvacuateFinished to improve latency --- .../java/org/apache/helix/HelixAdmin.java | 18 ++++++++-- .../rebalancer/CustomRebalancer.java | 9 +++-- .../apache/helix/manager/zk/ZKHelixAdmin.java | 31 ++++++++++------- .../org/apache/helix/common/ZkTestBase.java | 33 +++++++++++++++++++ .../rebalancer/TestCustomRebalancer.java | 3 +- .../rebalancer/TestInstanceOperation.java | 27 +++++++++++++++ .../org/apache/helix/mock/MockHelixAdmin.java | 5 +++ .../server/resources/AbstractResource.java | 1 + .../resources/helix/PerInstanceAccessor.java | 10 ++++++ .../rest/server/TestPerInstanceAccessor.java | 6 ++++ 10 files changed, 125 insertions(+), 18 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 5c2ef10f20..68bfba86c2 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -800,13 +800,25 @@ Map validateInstancesForWagedRebalance(String clusterName, /** * Return if instance operation 'Evacuate' is finished. * @param clusterName - * @param instancesNames - * @return Return true if there is no current state nor pending message on the instance. + * @param instancesName + * @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor + * any pending message on the instance. */ - default boolean isEvacuateFinished(String clusterName, String instancesNames) { + default boolean isEvacuateFinished(String clusterName, String instancesName) { throw new UnsupportedOperationException("isEvacuateFinished is not implemented."); } + /** + * Check to see if instance is drained. + * @param clusterName + * @param instanceName + * @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor + * any pending message on the instance. + */ + default boolean isInstanceDrained(String clusterName, String instanceName) { + throw new UnsupportedOperationException("isInstanceDrained is not implemented."); + } + /** * Check to see if swapping between two instances can be completed. Either the swapOut or * swapIn instance can be passed in. diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java index 939d94aedf..861fd4ed63 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java @@ -24,9 +24,11 @@ import java.util.Set; import org.apache.helix.HelixDefinedState; +import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; @@ -132,10 +134,13 @@ private Map computeCustomizedBestStateForPartition( boolean notInErrorState = currentStateMap != null && !HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance)); boolean enabled = !disabledInstancesForPartition.contains(instance) && isResourceEnabled; - + InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance); + boolean isInstanceEvacuated = instanceConfig != null && + instanceConfig.getInstanceOperation().getOperation() == InstanceConstants.InstanceOperation.EVACUATE; // Note: if instance is not live, the mapping for that instance will not show up in // BestPossibleMapping (and ExternalView) - if (assignableLiveInstancesMap.containsKey(instance) && notInErrorState) { + // if instance is evacuated keep the instanceStateMap same as idealStateMap + if ((assignableLiveInstancesMap.containsKey(instance) || isInstanceEvacuated) && notInErrorState) { if (enabled) { instanceStateMap.put(instance, idealStateMap.get(instance)); } else { diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 0d72ac4aaa..29216cf4b4 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -460,12 +460,17 @@ public void setInstanceOperation(String clusterName, String instanceName, @Override public boolean isEvacuateFinished(String clusterName, String instanceName) { - if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) { - InstanceConfig config = getInstanceConfig(clusterName, instanceName); - return config != null && config.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.EVACUATE); + InstanceConfig config = getInstanceConfig(clusterName, instanceName); + if (config == null || config.getInstanceOperation().getOperation() != + InstanceConstants.InstanceOperation.EVACUATE ) { + return false; } - return false; + return !instanceHasCurrentStateOrMessage(clusterName, instanceName); + } + + @Override + public boolean isInstanceDrained(String clusterName, String instanceName) { + return !instanceHasCurrentStateOrMessage(clusterName, instanceName); } /** @@ -721,7 +726,7 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName, @Override public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) { - if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) { + if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) { InstanceConfig config = getInstanceConfig(clusterName, instanceName); return config != null && INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains( config.getInstanceOperation().getOperation()); @@ -757,13 +762,14 @@ public boolean forceKillInstance(String clusterName, String instanceName, String } /** - * Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline, + * Return true if instance has any resource with FULL_AUTO or CUSTOMIZED rebalance mode in current state or + * if instance has any pending message. Otherwise, return false if instance is offline, * instance has no active session, or if instance is online but has no current state or pending message. * @param clusterName * @param instanceName * @return */ - private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, + private boolean instanceHasCurrentStateOrMessage(String clusterName, String instanceName) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -803,13 +809,14 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, return true; } - // Get set of FULL_AUTO resources + // Get set of FULL_AUTO and CUSTOMIZED resources List idealStates = accessor.getChildValues(keyBuilder.idealStates(), true); - Set fullAutoResources = idealStates != null ? idealStates.stream() - .filter(idealState -> idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) + Set resources = idealStates != null ? idealStates.stream() + .filter(idealState -> idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO || + idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) .map(IdealState::getResourceName).collect(Collectors.toSet()) : Collections.emptySet(); - return currentStates.stream().anyMatch(fullAutoResources::contains); + return currentStates.stream().anyMatch(resources::contains); } @Override diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index 278cdfc7ac..d6a761cf7d 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -30,12 +30,14 @@ import java.util.Map; import java.util.Set; import java.util.logging.Level; +import java.util.stream.Collectors; import javax.management.MBeanServerConnection; import javax.management.ObjectName; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixProperty; @@ -75,6 +77,7 @@ import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.StateModelConfigGenerator; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; @@ -447,6 +450,36 @@ protected void createDBInSemiAuto(ClusterSetup clusterSetup, String clusterName, clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, dbName, is); } + protected void createResourceInCustomizedMode(ClusterSetup clusterSetup, String clusterName, String resourceName, + Map partitionInstanceMap) { + IdealState idealState = new IdealState(resourceName); + idealState.setNumPartitions(partitionInstanceMap.size()); + idealState.setStateModelDefRef(OnlineOfflineSMD.name); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + partitionInstanceMap.forEach((partitionID, instanceName) -> { + idealState.setPartitionState(resourceName + "_" + partitionID, + instanceName, OnlineOfflineSMD.States.ONLINE.toString()); + }); + clusterSetup.addResourceToCluster(clusterName, resourceName, idealState); + } + + protected void removeAllResourcesFromInstance(MockParticipantManager participant, Set excludeResourceNames) { + RealmAwareZkClient zkClient = participant.getZkClient(); + String clusterName = participant.getClusterName(); + String instanceName = participant.getInstanceName(); + String sessionId = zkClient.getChildren(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName)).get(0); + List resourceNames = zkClient.getChildren( + PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId) + ); + for (String resourceName : resourceNames) { + if (!excludeResourceNames.contains(resourceName)) { + String resourcePath = PropertyPathBuilder.instanceCurrentState(clusterName, + instanceName, sessionId, resourceName); + zkClient.delete(resourcePath); + } + } + } + /** * Validate there should be always minimal active replica and top state replica for each * partition. diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java index 07a0a5de56..e8c7491af6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java @@ -25,6 +25,7 @@ import org.apache.helix.controller.rebalancer.CustomRebalancer; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.OnlineOfflineSMD; import org.apache.helix.model.Partition; @@ -70,7 +71,7 @@ public void testDisabledBootstrappingPartitions() { .thenReturn(ImmutableSet.of(instanceName)); when(cache.getAssignableLiveInstances()) .thenReturn(ImmutableMap.of(instanceName, new LiveInstance(instanceName))); - + when(cache.getInstanceConfigMap()).thenReturn(ImmutableMap.of(instanceName, new InstanceConfig(instanceName))); CurrentStateOutput currOutput = new CurrentStateOutput(); ResourceAssignment resourceAssignment = customRebalancer.computeBestPossiblePartitionState(cache, idealState, resource, currOutput); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 83b7c01924..98f79bcb29 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -282,6 +282,33 @@ public void testEvacuate() throws Exception { Assert.assertEquals(getEVs(), assignment); } + @Test + public void testEvacuateWithCustomizedResource() throws Exception { + System.out.println("START TestInstanceOperation.testEvacuateWithCustomizedResource() at " + new Date(System.currentTimeMillis())); + for( String resource : _allDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, resource); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + String instanceToEvacuate = _participants.get(0).getInstanceName(); + String customizedDB = "CustomizedTestDB"; + Map partitionInstanceMap = new HashMap<>(); + partitionInstanceMap.put(Integer.valueOf(0), _participants.get(0).getInstanceName()); + createResourceInCustomizedMode(_gSetupTool, CLUSTER_NAME, customizedDB, partitionInstanceMap); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + _gSetupTool.getClusterManagementTool() + .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null); + // evacuated instance + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); + _gSetupTool.getClusterManagementTool() + .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null); + // Drop customized DBs in clusterx + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, customizedDB); + createTestDBs(DEFAULT_RESOURCE_DELAY_TIME); + } + @Test(dependsOnMethods = "testEvacuate") public void testRevertEvacuation() throws Exception { System.out.println("START TestInstanceOperation.testRevertEvacuation() at " + new Date(System.currentTimeMillis())); diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 5a1a8a5bcb..7f516345ae 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -577,6 +577,11 @@ public boolean isEvacuateFinished(String clusterName, String instancesNames) { return false; } + @Override + public boolean isInstanceDrained(String clusterName, String instancesNames) { + return false; + } + @Override public boolean canCompleteSwap(String clusterName, String instancesNames) { return false; diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index 847eb802fa..13cabb1ee4 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -90,6 +90,7 @@ public enum Command { completeSwapIfPossible, onDemandRebalance, isEvacuateFinished, + isInstanceDrained, setPartitionsToError, forceKillInstance } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index d82c177962..1a16e3b9e8 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -508,6 +508,16 @@ public Response updateInstance(@PathParam("clusterId") String clusterId, return serverError(e); } return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", evacuateFinished))); + case isInstanceDrained: + boolean instanceDrained; + try { + instanceDrained = admin.isInstanceDrained(clusterId, instanceName); + } catch (HelixException e) { + LOG.error(String.format("Encountered error when checking if instance is drained for cluster: " + + "{}, instance: {}", clusterId, instanceName), e); + return serverError(e); + } + return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", instanceDrained))); case forceKillInstance: boolean instanceForceKilled = admin.forceKillInstance(clusterId, instanceName, reason, instanceOperationSource); if (!instanceForceKilled) { diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java index 32f47baae4..3ed432f00a 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java @@ -580,6 +580,12 @@ public void updateInstance() throws Exception { Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); Assert.assertTrue(evacuateFinishedResult.get("successful")); + response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isInstanceDrained") + .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); + Map instanceDrainedResult = OBJECT_MAPPER.readValue(response.readEntity(String.class), Map.class); + Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); + Assert.assertTrue(instanceDrainedResult.get("successful")); + // test isEvacuateFinished on instance with EVACUATE and no currentState // Create new instance so no currentState or messages assigned to it String test_instance_name = INSTANCE_NAME + "_foo"; From 86abe3f55fcbc7de2bd5a4fa929dcd3d7a104b9f Mon Sep 17 00:00:00 2001 From: Anubhav Agarwal Date: Fri, 9 May 2025 14:35:57 +0530 Subject: [PATCH 3/3] adding check for live instance tag --- .../apache/helix/controller/rebalancer/CustomRebalancer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java index 861fd4ed63..3b2f543e70 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java @@ -135,12 +135,13 @@ private Map computeCustomizedBestStateForPartition( && !HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance)); boolean enabled = !disabledInstancesForPartition.contains(instance) && isResourceEnabled; InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance); - boolean isInstanceEvacuated = instanceConfig != null && + boolean hasEvacuatedOp = instanceConfig != null && instanceConfig.getInstanceOperation().getOperation() == InstanceConstants.InstanceOperation.EVACUATE; + boolean isAssignableForCustomizedResource = cache.getLiveInstances().containsKey(instance) && hasEvacuatedOp; // Note: if instance is not live, the mapping for that instance will not show up in // BestPossibleMapping (and ExternalView) // if instance is evacuated keep the instanceStateMap same as idealStateMap - if ((assignableLiveInstancesMap.containsKey(instance) || isInstanceEvacuated) && notInErrorState) { + if ((assignableLiveInstancesMap.containsKey(instance) || isAssignableForCustomizedResource) && notInErrorState) { if (enabled) { instanceStateMap.put(instance, idealStateMap.get(instance)); } else {