Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -379,14 +380,21 @@ public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clust

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, toBeStoppedInstances,
false);
}

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances,
boolean preserveOrder) throws IOException {
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks,
toBeStoppedInstances);
toBeStoppedInstances, preserveOrder);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent));
toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent), preserveOrder);
return finalStoppableChecks;
}

Expand Down Expand Up @@ -476,12 +484,19 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl

private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
Set<String> toBeStoppedInstances) {
Set<String> toBeStoppedInstances, boolean preserveOrder) {

// Perform all but min_active replicas check in parallel
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
Collectors.toMap(
Function.identity(),
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)),
(existing, replacement) -> existing,
// Use LinkedHashMap when preserveOrder is true as we need to preserve the order of instances.
// This is important for addMinActiveReplicaChecks which processes instances sequentially,
// and the order of processing can affect which instances pass the min active replica check
preserveOrder ? LinkedHashMap::new : HashMap::new
));

// Perform min_active replicas check sequentially
addMinActiveReplicaChecks(clusterId, helixInstanceChecks, toBeStoppedInstances);
Expand All @@ -492,7 +507,7 @@ private List<String> batchHelixInstanceStoppableCheck(String clusterId,

private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<String> instances,
Set<String> toBeStoppedInstances, Map<String, StoppableCheck> finalStoppableChecks,
Map<String, String> customPayLoads) {
Map<String, String> customPayLoads, boolean preserveOrder) {
if (instances.isEmpty()) {
// if all instances failed at previous checks, then all following checks are not required.
return instances;
Expand Down Expand Up @@ -521,7 +536,7 @@ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<St
Map<String, StoppableCheck> clusterLevelCustomCheckResult =
performAggregatedCustomCheck(clusterId, instanceIdsForCustomCheck,
restConfig.getCompleteConfiguredHealthUrl().get(), customPayLoads,
toBeStoppedInstances);
toBeStoppedInstances, preserveOrder);
List<String> instancesForNextCheck = new ArrayList<>();
clusterLevelCustomCheckResult.forEach((instance, stoppableCheck) -> {
addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
Expand All @@ -538,9 +553,13 @@ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<St
List<String> instancesForCustomPartitionLevelChecks = instanceIdsForCustomCheck;
if (!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)) {
Map<String, Future<StoppableCheck>> customInstanceLevelChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance),
customPayLoads))));
Collectors.toMap(
Function.identity(),
instance -> POOL.submit(() -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance), customPayLoads)),
(existing, replacement) -> existing,
// Use LinkedHashMap when preserveOrder is true to maintain the original order of instances
preserveOrder ? LinkedHashMap::new : HashMap::new
));
instancesForCustomPartitionLevelChecks =
filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks);
}
Expand Down Expand Up @@ -618,12 +637,12 @@ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(
// this is helix own check
instancesForNext =
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
Collections.emptySet());
Collections.emptySet(), false);
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
batchCustomInstanceStoppableCheck(clusterId, instancesForNext, Collections.emptySet(),
finalStoppableChecks, healthCheckConfig);
finalStoppableChecks, healthCheckConfig, false);
} else {
throw new UnsupportedOperationException(healthCheck + " is not supported yet!");
}
Expand Down Expand Up @@ -770,8 +789,10 @@ private Map<String, StoppableCheck> performPartitionsCheck(List<String> instance

private Map<String, StoppableCheck> performAggregatedCustomCheck(String clusterId,
List<String> instances, String url, Map<String, String> customPayLoads,
Set<String> toBeStoppedInstances) {
Map<String, StoppableCheck> aggregatedStoppableChecks = new HashMap<>();
Set<String> toBeStoppedInstances, boolean preserveOrder) {
// Use LinkedHashMap when preserveOrder is true to maintain the original order of instances
Map<String, StoppableCheck> aggregatedStoppableChecks = preserveOrder ?
new LinkedHashMap<>() : new HashMap<>();
try {
Map<String, List<String>> customCheckResult =
_customRestClient.getAggregatedStoppableCheck(url, instances, toBeStoppedInstances,
Expand All @@ -784,9 +805,13 @@ private Map<String, StoppableCheck> performAggregatedCustomCheck(String clusterI
}
} catch (IOException ex) {
LOG.error("Custom client side aggregated health check for {} failed.", clusterId, ex);
return instances.stream().collect(Collectors.toMap(Function.identity(),
return instances.stream().collect(Collectors.toMap(
Function.identity(),
instance -> new StoppableCheck(false, Collections.singletonList(instance),
StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK)));
StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK),
(existing, replacement) -> existing,
// Use LinkedHashMap when preserveOrder is true to maintain the original order of instances
preserveOrder ? LinkedHashMap::new : HashMap::new));
}
return aggregatedStoppableChecks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@
import org.apache.helix.rest.server.json.instance.StoppableCheck;
import org.apache.helix.rest.server.resources.helix.InstancesAccessor;

/**
* This class is used to select stoppable instances based on different selection criteria.
* Selection criteria include:
* 1. Zone-based selection - Select instances from a single zone
* 2. Cross-zone selection - Select instances across multiple zones
* 3. Non-zone-based selection - Select instances regardless of zone
*
* For zone-based selection, instances can be ordered either lexicographically (default) or
* by preserving the original input order when preserveOrder is set to true.
*/
public class StoppableInstancesSelector {
// This type does not belong to real HealthCheck failed reason. Also, if we add this type
// to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl
Expand Down Expand Up @@ -81,6 +91,26 @@ private StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
*/
public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
List<String> toBeStoppedInstances) throws IOException {
return getStoppableInstancesInSingleZone(instances, toBeStoppedInstances, false);
}

/**
* Evaluates and collects stoppable instances within a specified or determined zone based on the order of zones.
* If _orderOfZone is specified, the method targets the first non-empty zone; otherwise, it targets the zone with
* the highest instance count. The method iterates through instances, performing stoppable checks, and records
* reasons for non-stoppability.
*
* @param instances A list of instance to be evaluated.
* @param toBeStoppedInstances A list of instances presumed to be already stopped
* @param preserveOrder Indicates whether to preserve the original order of instances
* @return An ObjectNode containing:
* - 'stoppableNode': List of instances that can be stopped.
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
* a list of reasons for non-stoppability as the value.
* @throws IOException
*/
public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
List<String> toBeStoppedInstances, boolean preserveOrder) throws IOException {
ObjectNode result = JsonNodeFactory.instance.objectNode();
ArrayNode stoppableInstances =
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
Expand All @@ -89,9 +119,9 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
Set<String> toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances);

List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping(), preserveOrder);
populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, preserveOrder);
processNonexistentInstances(instances, failedStoppableInstances);

return result;
Expand Down Expand Up @@ -128,7 +158,7 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
continue;
}
populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, false);
}
processNonexistentInstances(instances, failedStoppableInstances);
return result;
Expand Down Expand Up @@ -162,16 +192,16 @@ public ObjectNode getStoppableInstancesNonZoneBased(List<String> instances,
List<String> instancesToCheck = new ArrayList<>(instances);
instancesToCheck.removeAll(nonExistingInstances);
populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, false);

return result;
}

private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances, boolean preserveOrder) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
_maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances,
_customizedInput, toBeStoppedInstances);
_customizedInput, toBeStoppedInstances, preserveOrder);

for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
Expand Down Expand Up @@ -251,27 +281,39 @@ public void calculateOrderOfZone(List<String> instances, boolean random) {
* The order of zones can directly come from user input. If user did not specify it, Helix will order
* zones by the number of associated instances in descending order.
*
* @param instances
* @param zoneMapping
* @return
* @param instances List of instances to be considered
* @param zoneMapping Mapping from zone to instances
* @param preserveOrder Indicates whether to preserve the original order of instances
* @return List of instances in the first non-empty zone. If preserveOrder is true, the original order
* of instances is maintained. If preserveOrder is false (default), instances are sorted lexicographically.
*/
private List<String> getZoneBasedInstances(List<String> instances,
Map<String, Set<String>> zoneMapping) {
Map<String, Set<String>> zoneMapping, boolean preserveOrder) {
if (_orderOfZone.isEmpty()) {
return _orderOfZone;
return Collections.emptyList();
}

Set<String> instanceSet = null;
for (String zone : _orderOfZone) {
instanceSet = new TreeSet<>(instances);
Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
instanceSet.retainAll(currentZoneInstanceSet);
if (instanceSet.size() > 0) {
return new ArrayList<>(instanceSet);
Set<String> currentZoneInstanceSet = zoneMapping.get(zone);
if (currentZoneInstanceSet == null || currentZoneInstanceSet.isEmpty()) {
continue;
}

// Filter instances based on current zone
List<String> filteredInstances = instances.stream()
.filter(currentZoneInstanceSet::contains)
.collect(Collectors.toList());

if (!filteredInstances.isEmpty()) {
// If preserve order is not required, return sorted list
if (!preserveOrder) {
Collections.sort(filteredInstances); // Lexicographical order
}
return filteredInstances;
}
}

return Collections.EMPTY_LIST;
return Collections.emptyList();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,
@QueryParam("continueOnFailures") boolean continueOnFailures,
@QueryParam("skipZKRead") boolean skipZKRead,
@QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories,
@DefaultValue("false") @QueryParam("random") boolean random, String content) {
@DefaultValue("false") @QueryParam("random") boolean random,
@DefaultValue("false") @QueryParam("preserveOrder") boolean preserveOrder,
String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
Expand Down Expand Up @@ -204,7 +206,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,
break;
case stoppable:
return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures,
skipHealthCheckCategorySet, random);
skipHealthCheckCategorySet, random, preserveOrder);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
Expand All @@ -222,7 +224,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId,

private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead,
boolean continueOnFailures, Set<StoppableCheck.Category> skipHealthCheckCategories,
boolean random) throws IOException {
boolean random, boolean preserveOrder) throws IOException {
try {
// TODO: Process input data from the content
// TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799
Expand Down Expand Up @@ -360,7 +362,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
switch (selectionBase) {
case zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances);
result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances, preserveOrder);
break;
case cross_zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
Expand Down
Loading
Loading