Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -47,7 +48,6 @@
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.net.Address;
Expand Down Expand Up @@ -532,8 +532,7 @@ public BalanceResponse balanceRSGroup(String groupName, BalanceRequest request)
Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName);
if (groupRIT.size() > 0 && !request.isIgnoreRegionsInTransition()) {
LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(),
StringUtils.abbreviate(
master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(),
StringUtils.abbreviate(master.getAssignmentManager().getRegionsInTransition().toString(),
256));
return responseBuilder.build();
}
Expand Down Expand Up @@ -654,14 +653,12 @@ public void updateConfiguration(String groupName) throws IOException {

private Map<String, RegionState> rsGroupGetRegionsInTransition(String groupName)
throws IOException {
SortedSet<TableName> tablesInGroup = getRSGroupInfo(groupName).getTables();
Map<String, RegionState> rit = Maps.newTreeMap();
AssignmentManager am = master.getAssignmentManager();
for (TableName tableName : getRSGroupInfo(groupName).getTables()) {
for (RegionInfo regionInfo : am.getRegionStates().getRegionsOfTable(tableName)) {
RegionState state = am.getRegionStates().getRegionTransitionState(regionInfo);
if (state != null) {
rit.put(regionInfo.getEncodedName(), state);
}
for (RegionStateNode regionNode : master.getAssignmentManager().getRegionsInTransition()) {
TableName tn = regionNode.getTable();
if (tablesInGroup.contains(tn)) {
rit.put(regionNode.getRegionInfo().getEncodedName(), regionNode.toRegionState());
}
}
return rit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public boolean evaluate() throws Exception {
@Override
public boolean evaluate() throws Exception {
return groupRS.getNumberOfOnlineRegions() < 1
&& master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() < 1;
&& master.getAssignmentManager().getRegionsInTransitionCount() < 1;
}
});
// Move table to group and wait.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ int limit = 100;
</%args>

<%java>
SortedSet<RegionState> rit = assignmentManager.getRegionStates()
.getRegionsInTransitionOrderedByTimestamp();
SortedSet<RegionState> rit = assignmentManager.getRegionsStateInTransition();
</%java>

<%if !rit.isEmpty() %>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
<tr>
<th></th>
<td><% deadServerName %></td>
<td><% deadServerUtil.getTimeOfDeath(deadServerName) %></td>
<td><% new Date(deadServerUtil.getTimeOfDeath(deadServerName)) %></td>
</tr>
<%java>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -115,6 +114,10 @@ public synchronized void finish(ServerName sn) {
}
}

synchronized void putIfAbsent(ServerName sn, long crashedTime) {
this.deadServers.putIfAbsent(sn, crashedTime);
}

public synchronized int size() {
return deadServers.size();
}
Expand Down Expand Up @@ -209,9 +212,9 @@ synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts) {
* @param deadServerName the dead server name
* @return the date when the server died
*/
public synchronized Date getTimeOfDeath(final ServerName deadServerName) {
public synchronized long getTimeOfDeath(final ServerName deadServerName) {
Long time = deadServers.get(deadServerName);
return time == null ? null : new Date(time);
return time == null ? 0 : time;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,14 @@ private void finishActiveMasterInitialization()
Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType = procedureExecutor
.getActiveProceduresNoCopy().stream().collect(Collectors.groupingBy(p -> p.getClass()));

// This manager will be started AFTER hbase:meta is confirmed on line and must be initialised
// before assignment manager
// hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
// state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
this.tableStateManager =
this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true)
? new MirroringTableStateManager(this)
: new TableStateManager(this);
// Create Assignment Manager
this.assignmentManager = createAssignmentManager(this, masterRegion);
this.assignmentManager.start();
Expand All @@ -992,16 +1000,10 @@ private void finishActiveMasterInitialization()
// TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
this.regionServerTracker.upgrade(
procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
.map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
.map(p -> (ServerCrashProcedure) p).collect(
Collectors.toMap(ServerCrashProcedure::getServerName, Procedure::getSubmittedTime)),
Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()),
walManager.getSplittingServersFromWALDir());
// This manager will be started AFTER hbase:meta is confirmed on line.
// hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
// state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
this.tableStateManager =
this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true)
? new MirroringTableStateManager(this)
: new TableStateManager(this);

startupTaskGroup.addTask("Initializing ZK system trackers");
initializeZKBasedSystemTrackers();
Expand Down Expand Up @@ -1910,7 +1912,7 @@ private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransi
// But if there are zero regions in transition, it can skip sleep to speed up.
while (
!interrupted && EnvironmentEdgeManager.currentTime() < nextBalanceStartTime
&& this.assignmentManager.getRegionStates().hasRegionsInTransition()
&& this.assignmentManager.getRegionTransitScheduledCount() > 0
) {
try {
Thread.sleep(100);
Expand All @@ -1922,8 +1924,7 @@ private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransi
// Throttling by max number regions in transition
while (
!interrupted && maxRegionsInTransition > 0
&& this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
>= maxRegionsInTransition
&& this.assignmentManager.getRegionTransitScheduledCount() >= maxRegionsInTransition
&& EnvironmentEdgeManager.currentTime() <= cutoffTime
) {
try {
Expand Down Expand Up @@ -2004,7 +2005,7 @@ public BalanceResponse balance(BalanceRequest request) throws IOException {

synchronized (this.balancer) {
// Only allow one balance run at at time.
if (this.assignmentManager.hasRegionsInTransition()) {
if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
// if hbase:meta region is in transition, result of assignment cannot be recorded
// ignore the force flag in that case
Expand All @@ -2019,7 +2020,8 @@ public BalanceResponse balance(BalanceRequest request) throws IOException {

if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" + metaInTransition
+ ") because " + regionsInTransition.size() + " region(s) in transition: " + toPrint
+ ") because " + assignmentManager.getRegionTransitScheduledCount()
+ " region(s) are scheduled to transit " + toPrint
+ (truncated ? "(truncated list)" : ""));
return responseBuilder.build();
}
Expand Down Expand Up @@ -2152,7 +2154,7 @@ public boolean normalizeRegions(final NormalizeTableFilterParams ntfp,
if (skipRegionManagementAction("region normalizer")) {
return false;
}
if (assignmentManager.hasRegionsInTransition()) {
if (assignmentManager.getRegionTransitScheduledCount() > 0) {
return false;
}

Expand Down Expand Up @@ -2929,7 +2931,7 @@ public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> option
case REGIONS_IN_TRANSITION: {
if (assignmentManager != null) {
builder.setRegionsInTransition(
assignmentManager.getRegionStates().getRegionsStateInTransition());
new ArrayList<>(assignmentManager.getRegionsStateInTransition()));
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -116,22 +117,23 @@ private RegionServerInfo getServerInfo(ServerName serverName)
* In this method, we will also construct the region server sets in {@link ServerManager}. If a
* region server is dead between the crash of the previous master instance and the start of the
* current master instance, we will schedule a SCP for it. This is done in
* {@link ServerManager#findDeadServersAndProcess(Set, Set)}, we call it here under the lock
* {@link ServerManager#findDeadServersAndProcess(Map, Set)}, we call it here under the lock
* protection to prevent concurrency issues with server expiration operation.
* @param deadServersFromPE the region servers which already have SCP associated.
* @param liveServersBeforeRestart the live region servers we recorded before master restarts.
* @param splittingServersFromWALDir Servers whose WALs are being actively 'split'.
*/
public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersBeforeRestart,
Set<ServerName> splittingServersFromWALDir) throws KeeperException, IOException {
public void upgrade(Map<ServerName, Long> deadServersFromPE,
Set<ServerName> liveServersBeforeRestart, Set<ServerName> splittingServersFromWALDir)
throws KeeperException, IOException {
LOG.info(
"Upgrading RegionServerTracker to active master mode; {} have existing"
+ "ServerCrashProcedures, {} possibly 'live' servers, and {} 'splitting'.",
deadServersFromPE.size(), liveServersBeforeRestart.size(), splittingServersFromWALDir.size());
// deadServersFromPE is made from a list of outstanding ServerCrashProcedures.
// splittingServersFromWALDir are being actively split -- the directory in the FS ends in
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s))
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.containsKey(s))
.forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
// create ServerNode for all possible live servers from wal directory
liveServersBeforeRestart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics
* @param deadServersFromPE the region servers which already have a SCP associated.
* @param liveServersFromWALDir the live region servers from wal directory.
*/
void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
void findDeadServersAndProcess(Map<ServerName, Long> deadServersFromPE,
Set<ServerName> liveServersFromWALDir) {
deadServersFromPE.forEach(deadservers::putIfAbsent);
liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
Expand Down
Loading