diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index 8d5d025b3c..18d1abd60c 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -379,14 +380,21 @@ public Map batchGetInstancesStoppableChecks(String clust public Map batchGetInstancesStoppableChecks(String clusterId, List instances, String jsonContent, Set toBeStoppedInstances) throws IOException { + return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, toBeStoppedInstances, + false); + } + + public Map batchGetInstancesStoppableChecks(String clusterId, + List instances, String jsonContent, Set toBeStoppedInstances, + boolean preserveOrder) throws IOException { Map finalStoppableChecks = new HashMap<>(); // helix instance check. List instancesForCustomInstanceLevelChecks = batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, - toBeStoppedInstances); + toBeStoppedInstances, preserveOrder); // custom check, includes partition check. batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks, - toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent)); + toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent), preserveOrder); return finalStoppableChecks; } @@ -476,12 +484,19 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl private List batchHelixInstanceStoppableCheck(String clusterId, Collection instances, Map finalStoppableChecks, - Set toBeStoppedInstances) { + Set toBeStoppedInstances, boolean preserveOrder) { // Perform all but min_active replicas check in parallel Map> helixInstanceChecks = instances.stream().collect( - Collectors.toMap(Function.identity(), instance -> POOL.submit( - () -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)))); + Collectors.toMap( + Function.identity(), + instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)), + (existing, replacement) -> existing, + // Use LinkedHashMap when preserveOrder is true as we need to preserve the order of instances. + // This is important for addMinActiveReplicaChecks which processes instances sequentially, + // and the order of processing can affect which instances pass the min active replica check + preserveOrder ? LinkedHashMap::new : HashMap::new + )); // Perform min_active replicas check sequentially addMinActiveReplicaChecks(clusterId, helixInstanceChecks, toBeStoppedInstances); @@ -492,7 +507,7 @@ private List batchHelixInstanceStoppableCheck(String clusterId, private List batchCustomInstanceStoppableCheck(String clusterId, List instances, Set toBeStoppedInstances, Map finalStoppableChecks, - Map customPayLoads) { + Map customPayLoads, boolean preserveOrder) { if (instances.isEmpty()) { // if all instances failed at previous checks, then all following checks are not required. return instances; @@ -521,7 +536,7 @@ private List batchCustomInstanceStoppableCheck(String clusterId, List clusterLevelCustomCheckResult = performAggregatedCustomCheck(clusterId, instanceIdsForCustomCheck, restConfig.getCompleteConfiguredHealthUrl().get(), customPayLoads, - toBeStoppedInstances); + toBeStoppedInstances, preserveOrder); List instancesForNextCheck = new ArrayList<>(); clusterLevelCustomCheckResult.forEach((instance, stoppableCheck) -> { addStoppableCheck(finalStoppableChecks, instance, stoppableCheck); @@ -538,9 +553,13 @@ private List batchCustomInstanceStoppableCheck(String clusterId, List instancesForCustomPartitionLevelChecks = instanceIdsForCustomCheck; if (!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)) { Map> customInstanceLevelChecks = instances.stream().collect( - Collectors.toMap(Function.identity(), instance -> POOL.submit( - () -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance), - customPayLoads)))); + Collectors.toMap( + Function.identity(), + instance -> POOL.submit(() -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance), customPayLoads)), + (existing, replacement) -> existing, + // Use LinkedHashMap when preserveOrder is true to maintain the original order of instances + preserveOrder ? LinkedHashMap::new : HashMap::new + )); instancesForCustomPartitionLevelChecks = filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks); } @@ -618,12 +637,12 @@ private Map batchInstanceHealthCheck( // this is helix own check instancesForNext = batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, - Collections.emptySet()); + Collections.emptySet(), false); } else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) { // custom check, includes custom Instance check and partition check. instancesForNext = batchCustomInstanceStoppableCheck(clusterId, instancesForNext, Collections.emptySet(), - finalStoppableChecks, healthCheckConfig); + finalStoppableChecks, healthCheckConfig, false); } else { throw new UnsupportedOperationException(healthCheck + " is not supported yet!"); } @@ -770,8 +789,10 @@ private Map performPartitionsCheck(List instance private Map performAggregatedCustomCheck(String clusterId, List instances, String url, Map customPayLoads, - Set toBeStoppedInstances) { - Map aggregatedStoppableChecks = new HashMap<>(); + Set toBeStoppedInstances, boolean preserveOrder) { + // Use LinkedHashMap when preserveOrder is true to maintain the original order of instances + Map aggregatedStoppableChecks = preserveOrder ? + new LinkedHashMap<>() : new HashMap<>(); try { Map> customCheckResult = _customRestClient.getAggregatedStoppableCheck(url, instances, toBeStoppedInstances, @@ -784,9 +805,13 @@ private Map performAggregatedCustomCheck(String clusterI } } catch (IOException ex) { LOG.error("Custom client side aggregated health check for {} failed.", clusterId, ex); - return instances.stream().collect(Collectors.toMap(Function.identity(), + return instances.stream().collect(Collectors.toMap( + Function.identity(), instance -> new StoppableCheck(false, Collections.singletonList(instance), - StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK))); + StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK), + (existing, replacement) -> existing, + // Use LinkedHashMap when preserveOrder is true to maintain the original order of instances + preserveOrder ? LinkedHashMap::new : HashMap::new)); } return aggregatedStoppableChecks; } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index 8916991008..8e36b371db 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -42,6 +42,16 @@ import org.apache.helix.rest.server.json.instance.StoppableCheck; import org.apache.helix.rest.server.resources.helix.InstancesAccessor; +/** + * This class is used to select stoppable instances based on different selection criteria. + * Selection criteria include: + * 1. Zone-based selection - Select instances from a single zone + * 2. Cross-zone selection - Select instances across multiple zones + * 3. Non-zone-based selection - Select instances regardless of zone + * + * For zone-based selection, instances can be ordered either lexicographically (default) or + * by preserving the original input order when preserveOrder is set to true. + */ public class StoppableInstancesSelector { // This type does not belong to real HealthCheck failed reason. Also, if we add this type // to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl @@ -81,6 +91,26 @@ private StoppableInstancesSelector(String clusterId, List orderOfZone, */ public ObjectNode getStoppableInstancesInSingleZone(List instances, List toBeStoppedInstances) throws IOException { + return getStoppableInstancesInSingleZone(instances, toBeStoppedInstances, false); + } + + /** + * Evaluates and collects stoppable instances within a specified or determined zone based on the order of zones. + * If _orderOfZone is specified, the method targets the first non-empty zone; otherwise, it targets the zone with + * the highest instance count. The method iterates through instances, performing stoppable checks, and records + * reasons for non-stoppability. + * + * @param instances A list of instance to be evaluated. + * @param toBeStoppedInstances A list of instances presumed to be already stopped + * @param preserveOrder Indicates whether to preserve the original order of instances + * @return An ObjectNode containing: + * - 'stoppableNode': List of instances that can be stopped. + * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and + * a list of reasons for non-stoppability as the value. + * @throws IOException + */ + public ObjectNode getStoppableInstancesInSingleZone(List instances, + List toBeStoppedInstances, boolean preserveOrder) throws IOException { ObjectNode result = JsonNodeFactory.instance.objectNode(); ArrayNode stoppableInstances = result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); @@ -89,9 +119,9 @@ public ObjectNode getStoppableInstancesInSingleZone(List instances, Set toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances); List zoneBasedInstance = - getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); + getZoneBasedInstances(instances, _clusterTopology.toZoneMapping(), preserveOrder); populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances); + failedStoppableInstances, preserveOrder); processNonexistentInstances(instances, failedStoppableInstances); return result; @@ -128,7 +158,7 @@ public ObjectNode getStoppableInstancesCrossZones(List instances, continue; } populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances); + failedStoppableInstances, false); } processNonexistentInstances(instances, failedStoppableInstances); return result; @@ -162,16 +192,16 @@ public ObjectNode getStoppableInstancesNonZoneBased(List instances, List instancesToCheck = new ArrayList<>(instances); instancesToCheck.removeAll(nonExistingInstances); populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances); + failedStoppableInstances, false); return result; } private void populateStoppableInstances(List instances, Set toBeStoppedInstances, - ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException { + ArrayNode stoppableInstances, ObjectNode failedStoppableInstances, boolean preserveOrder) throws IOException { Map instancesStoppableChecks = _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances, - _customizedInput, toBeStoppedInstances); + _customizedInput, toBeStoppedInstances, preserveOrder); for (Map.Entry instanceStoppableCheck : instancesStoppableChecks.entrySet()) { String instance = instanceStoppableCheck.getKey(); @@ -251,27 +281,39 @@ public void calculateOrderOfZone(List instances, boolean random) { * The order of zones can directly come from user input. If user did not specify it, Helix will order * zones by the number of associated instances in descending order. * - * @param instances - * @param zoneMapping - * @return + * @param instances List of instances to be considered + * @param zoneMapping Mapping from zone to instances + * @param preserveOrder Indicates whether to preserve the original order of instances + * @return List of instances in the first non-empty zone. If preserveOrder is true, the original order + * of instances is maintained. If preserveOrder is false (default), instances are sorted lexicographically. */ private List getZoneBasedInstances(List instances, - Map> zoneMapping) { + Map> zoneMapping, boolean preserveOrder) { if (_orderOfZone.isEmpty()) { - return _orderOfZone; + return Collections.emptyList(); } - Set instanceSet = null; for (String zone : _orderOfZone) { - instanceSet = new TreeSet<>(instances); - Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); - instanceSet.retainAll(currentZoneInstanceSet); - if (instanceSet.size() > 0) { - return new ArrayList<>(instanceSet); + Set currentZoneInstanceSet = zoneMapping.get(zone); + if (currentZoneInstanceSet == null || currentZoneInstanceSet.isEmpty()) { + continue; + } + + // Filter instances based on current zone + List filteredInstances = instances.stream() + .filter(currentZoneInstanceSet::contains) + .collect(Collectors.toList()); + + if (!filteredInstances.isEmpty()) { + // If preserve order is not required, return sorted list + if (!preserveOrder) { + Collections.sort(filteredInstances); // Lexicographical order + } + return filteredInstances; } } - return Collections.EMPTY_LIST; + return Collections.emptyList(); } /** diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index 1d6e249db9..812f8d349a 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -159,7 +159,9 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, @QueryParam("continueOnFailures") boolean continueOnFailures, @QueryParam("skipZKRead") boolean skipZKRead, @QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories, - @DefaultValue("false") @QueryParam("random") boolean random, String content) { + @DefaultValue("false") @QueryParam("random") boolean random, + @DefaultValue("false") @QueryParam("preserveOrder") boolean preserveOrder, + String content) { Command cmd; try { cmd = Command.valueOf(command); @@ -204,7 +206,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, break; case stoppable: return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures, - skipHealthCheckCategorySet, random); + skipHealthCheckCategorySet, random, preserveOrder); default: _logger.error("Unsupported command :" + command); return badRequest("Unsupported command :" + command); @@ -222,7 +224,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead, boolean continueOnFailures, Set skipHealthCheckCategories, - boolean random) throws IOException { + boolean random, boolean preserveOrder) throws IOException { try { // TODO: Process input data from the content // TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799 @@ -360,7 +362,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo switch (selectionBase) { case zone_based: stoppableInstancesSelector.calculateOrderOfZone(instances, random); - result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances); + result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances, preserveOrder); break; case cross_zone_based: stoppableInstancesSelector.calculateOrderOfZone(instances, random); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java index 7fe4bae288..5c06aef32a 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -624,6 +624,75 @@ public void testMultipleReplicasInSameMZ() throws Exception { System.out.println("End test :" + TestHelper.getTestMethodName()); } + @DataProvider(name = "preserveOrderProvider") + public Object[][] preserveOrderProvider() { + return new Object[][] { + { true }, + { false } + }; + } + + @Test(dataProvider = "preserveOrderProvider", + dependsOnMethods = "testMultipleReplicasInSameMZ" + ) + public void testMultipleReplicasInSameMZWithPreserveOrder(boolean preserveOrder) throws Exception { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + // Create SemiAuto DB so that we can control assignment + String testDb = TestHelper.getTestMethodName() + "_resource_" + preserveOrder; + _gSetupTool.getClusterManagementTool().addResource(STOPPABLE_CLUSTER2, testDb, 3, "MasterSlave", + IdealState.RebalanceMode.SEMI_AUTO.toString()); + _gSetupTool.getClusterManagementTool().rebalance(STOPPABLE_CLUSTER2, testDb, 3); + + // Manually set ideal state to have the 3 replcias assigned to 3 instances all in the same zone + List preferenceList = Arrays.asList("instance0", "instance1", "instance2"); + IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(STOPPABLE_CLUSTER2, testDb); + for (String p : is.getPartitionSet()) { + is.setPreferenceList(p, preferenceList); + } + is.setMinActiveReplicas(2); + _gSetupTool.getClusterManagementTool().setResourceIdealState(STOPPABLE_CLUSTER2, testDb, is); + + // Wait for assignments to take place + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER2).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Run stoppable check against the 3 instances where SemiAuto DB was assigned + String content = + String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance2", "instance0"); + Response response = new JerseyUriRequestBuilder(String.format( + "clusters/%s/instances?command=stoppable&skipHealthCheckCategories=%s&preserveOrder=%s", + STOPPABLE_CLUSTER2, + "CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK", + preserveOrder)) + .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + String stoppableNode = "instance0"; + List nonStoppableNodes = Arrays.asList("instance1", "instance2"); + if (preserveOrder) { + stoppableNode = "instance1"; + nonStoppableNodes = Arrays.asList("instance0", "instance2"); + } + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(Collections.singleton(stoppableNode).equals(stoppableSet)); + + // Next 2 instances should fail stoppable due to MIN_ACTIVE_REPLICA_CHECK_FAILED + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertFalse(getStringSet(nonStoppableInstances, stoppableNode) + .contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertTrue(getStringSet(nonStoppableInstances, nonStoppableNodes.get(0)) + .contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertTrue(getStringSet(nonStoppableInstances, nonStoppableNodes.get(1)) + .contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + @Test(dependsOnMethods = "testMultipleReplicasInSameMZ") public void testSkipClusterLevelHealthCheck() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName());