From eea4e0b5c99b209037f700337d9afd1f7c0a9f81 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Wed, 12 Mar 2025 15:24:52 -0700 Subject: [PATCH] test and fix --- .../waged/WagedInstanceCapacity.java | 6 + .../helix/integration/TestWagedNPE.java | 138 +++++++++++++----- 2 files changed, 105 insertions(+), 39 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java index d8380a058b..c9249b5391 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java @@ -201,6 +201,12 @@ public synchronized boolean checkAndReduceInstanceCapacity(String instance, Stri return true; } + if (!_instanceCapacityMap.containsKey(instance)) { + LOG.error("Instance: " + instance + " not found in instance capacity map. Cluster may be using previous " + + "idealState that includes an instance that is no longer part of the cluster."); + return false; + } + Map instanceCapacity = _instanceCapacityMap.get(instance); Map processedCapacity = new HashMap<>(); for (String key : instanceCapacity.keySet()) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java index 57930fa712..bab4be569a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java @@ -2,19 +2,16 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.ConfigAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; -import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; -import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; -import org.apache.helix.model.ResourceConfig; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -23,11 +20,15 @@ public class TestWagedNPE extends ZkTestBase { - public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster"; - public static int PARTICIPANT_COUNT = 3; - public static List _participants = new ArrayList<>(); - public static ClusterControllerManager _controller; - public static ConfigAccessor _configAccessor; + private final String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster"; + private final int PARTICIPANT_COUNT = 3; + private final int PARTITION_COUNT = 3; + private final int REPLICA_COUNT = 3; + private final int DEFAULT_VERIFIER_TIMEOUT = 15000; + private List _participants = new ArrayList<>(); + private ClusterControllerManager _controller; + private ConfigAccessor _configAccessor; + private BestPossibleExternalViewVerifier _verifier; @BeforeClass public void beforeClass() { @@ -45,60 +46,108 @@ public void beforeClass() { ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); String testCapacityKey = "TestCapacityKey"; clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey)); - clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100)); + clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 3)); clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1)); + clusterConfig.setPersistBestPossibleAssignment(true); _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + _verifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); } // This test was constructed to capture the bug described in issue 2891 // https://github.com/apache/helix/issues/2891 @Test - public void testNPE() throws Exception { - int numPartition = 3; - BestPossibleExternalViewVerifier verifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); - + public void testNPEonNewResource() { + System.out.println("Start test " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName()); // Create 1 WAGED Resource String firstDB = "firstDB"; - _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby", - IdealState.RebalanceMode.FULL_AUTO.name(), null); - IdealState idealStateOne = - _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB); - idealStateOne.setMinActiveReplicas(2); - idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName()); - _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne); - _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3); + addWagedResource(firstDB, PARTITION_COUNT, REPLICA_COUNT); // Wait for cluster to converge - Assert.assertTrue(verifier.verifyByPolling()); + Assert.assertTrue(_verifier.verifyByPolling()); // Drop resource _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, firstDB); // Wait for cluster to converge - Assert.assertTrue(verifier.verifyByPolling()); + Assert.assertTrue(_verifier.verifyByPolling()); // add instance - addParticipant("instance_to_add"); + MockParticipantManager instanceToAdd = addParticipant("instance_to_add"); // Wait for cluster to converge - Assert.assertTrue(verifier.verifyByPolling()); + Assert.assertTrue(_verifier.verifyByPolling()); // Add a new resource - String secondDb = "secondDB"; - _configAccessor.setResourceConfig(CLUSTER_NAME, secondDb, new ResourceConfig(secondDb)); - _gSetupTool.addResourceToCluster(CLUSTER_NAME, secondDb, numPartition, "LeaderStandby", - IdealState.RebalanceMode.FULL_AUTO.name(), null); - IdealState idealStateTwo = - _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, secondDb); - idealStateTwo.setMinActiveReplicas(2); - idealStateTwo.setRebalancerClassName(WagedRebalancer.class.getName()); - _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, secondDb, idealStateTwo); - _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, secondDb, 3); + String secondDB = "secondDB"; + addWagedResource(secondDB, PARTITION_COUNT, REPLICA_COUNT); // Confirm cluster can converge. Cluster will not converge if NPE occurs during pipeline run - Assert.assertTrue(verifier.verifyByPolling()); + Assert.assertTrue(_verifier.verifyByPolling()); + + // Reset cluster + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, secondDB); + instanceToAdd.syncStop(); + _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToAdd.getInstanceName())); + Assert.assertTrue(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME).isEmpty()); + Assert.assertTrue(_verifier.verifyByPolling()); + System.out.println("End test " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testNPEonNewResource") + public void testNPEonRebalanceFailure() throws Exception { + System.out.println("Start test " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName()); + // Add 1 WAGED resource that will be succesfully placed + String firstDB = "firstDB"; + addWagedResource(firstDB, PARTITION_COUNT, REPLICA_COUNT); + + // Wait for cluster to converge + Assert.assertTrue(_verifier.verifyByPolling()); + + // Add a 2nd WAGED resource that will fail to place + String secondDB = "secondDB"; + addWagedResource(secondDB, PARTITION_COUNT, REPLICA_COUNT); + + // Kill 1 instance + MockParticipantManager instanceToKill = _participants.get(0); + instanceToKill.syncStop(); + Assert.assertTrue(TestHelper.verify(() -> + !_gZkClient.exists("/" + CLUSTER_NAME + "/LIVEINSTANCES/" + instanceToKill.getInstanceName()), + DEFAULT_VERIFIER_TIMEOUT)); + + // Assert that each partition for firstDB has a LEADER replica + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, firstDB) + .getRecord().getMapFields().forEach((partition, partitionStateMap) -> { + Assert.assertFalse(partitionStateMap.containsKey(instanceToKill.getInstanceName())); + Assert.assertTrue(partitionStateMap.containsValue("LEADER")); + }); + + // Drop the dead instance + _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToKill.getInstanceName())); + + // Kill another instance + MockParticipantManager instanceToKill2 = _participants.get(1); + instanceToKill2.syncStop(); + Assert.assertTrue(TestHelper.verify(() -> + !_gZkClient.exists("/" + CLUSTER_NAME + "/LIVEINSTANCES/" + instanceToKill2.getInstanceName()), + DEFAULT_VERIFIER_TIMEOUT)); + + // Assert that each partition for firstDB has a LEADER replica still + Assert.assertTrue(TestHelper.verify( () -> { + AtomicBoolean verified = new AtomicBoolean(true); + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, firstDB) + .getRecord().getMapFields().forEach((partition, partitionStateMap) -> { + boolean result = !partitionStateMap.containsKey(instanceToKill.getInstanceName()) && + !partitionStateMap.containsKey(instanceToKill2.getInstanceName()) && + partitionStateMap.containsValue("LEADER"); + if (!result) { + verified.set(result); + } + }); + return verified.get();}, DEFAULT_VERIFIER_TIMEOUT)); + System.out.println("End test " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName()); } public MockParticipantManager addParticipant(String instanceName) { @@ -108,4 +157,15 @@ public MockParticipantManager addParticipant(String instanceName) { _participants.add(participant); return participant; } + + private void addWagedResource(String resourceName, int partitions, int replicas) { + _gSetupTool.addResourceToCluster(CLUSTER_NAME, resourceName, partitions, "LeaderStandby", + IdealState.RebalanceMode.FULL_AUTO.name(), null); + IdealState idealStateOne = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, resourceName); + idealStateOne.setMinActiveReplicas(2); + idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName()); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, resourceName, idealStateOne); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, resourceName, replicas); + } }