Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -506,41 +507,75 @@ public String getRootTabletLocation() {
return loc.getHostPort();
}

/**
* Returns the location(s) of the accumulo manager and any redundant servers.
*
* @return a list of locations in "hostname:port" form
*/
public List<String> getManagerLocations() {
public String getPrimaryManagerLocation() {
ensureOpen();
var zLockManagerPath =
ServiceLock.path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGER_LOCK);

OpTimer timer = null;

if (log.isTraceEnabled()) {
log.trace("tid={} Looking up manager location in zookeeper at {}.",
log.trace("tid={} Looking up primary manager location in zookeeper at {}.",
Thread.currentThread().getId(), zLockManagerPath);
timer = new OpTimer().start();
}

Optional<ServiceLockData> sld = zooCache.getLockData(zLockManagerPath);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
location = sld.orElseThrow().getAddressString(ThriftService.FATE);
}

if (timer != null) {
timer.stop();
log.trace("tid={} Found manager at {} in {}", Thread.currentThread().getId(),
log.trace("tid={} Found primary manager at {} in {}", Thread.currentThread().getId(),
(location == null ? "null" : location), String.format("%.3f secs", timer.scale(SECONDS)));
}

if (location == null) {
return Collections.emptyList();
return location;
}

/**
* Returns the location(s) of the accumulo manager and any redundant servers.
*
* @return a list of locations in "hostname:port" form
*/
public List<String> getManagerLocations() {
ensureOpen();

List<String> locations = new ArrayList<>();

var zLockManagerPath =
ServiceLock.path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS);

OpTimer timer = null;

if (log.isTraceEnabled()) {
log.trace("tid={} Looking up manager locations in zookeeper at {}.",
Thread.currentThread().getId(), zLockManagerPath);
timer = new OpTimer().start();
}

for (String manager : zooCache
.getChildren(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS)) {
if (manager.contains(":")) {
var zLocPath = ServiceLock
.path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS + "/" + manager);
Optional<ServiceLockData> sld = zooCache.getLockData(zLocPath);
if (sld.isPresent()) {
locations.add(sld.orElseThrow().getAddressString(ThriftService.MANAGER));
}
}
}

if (timer != null) {
timer.stop();
log.trace("tid={} Found managers at {} in {}", Thread.currentThread().getId(),
(locations == null ? "null" : locations),
String.format("%.3f secs", timer.scale(SECONDS)));
}

return Collections.singletonList(location);
return locations;
}

/**
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ public enum Property {
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
"The port used for handling client connections on the manager.", "1.3.5"),
MANAGER_PORTSEARCH("manager.port.search", "false", PropertyType.BOOLEAN,
"If the manager.port.client is in use, search higher ports until one is available.", "3.1.0"),
MANAGER_TABLET_BALANCER("manager.tablet.balancer",
"org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and "
Expand Down Expand Up @@ -371,6 +373,19 @@ public enum Property {
"Allow tablets for the " + MetadataTable.NAME
+ " table to be suspended via table.suspend.duration.",
"1.8.0"),
MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT("manager.startup.manager.avail.min.count", "0",
PropertyType.COUNT,
"Minimum number of managers that need to be registered before a manager will start. A value "
+ "greater than 0 is useful when multiple managers are supposed to be running on startup. "
+ "When set to 0 or less, no blocking occurs. Default is 0 (disabled).",
"3.1.0"),
MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT("manager.startup.manager.avail.max.wait", "0",
PropertyType.TIMEDURATION,
"Maximum time manager will wait for manager available threshold "
+ "to be reached before continuing. When set to 0 or less, will block "
+ "indefinitely. Default is 0 to block indefinitely. Only valid when manager available "
+ "threshold is set greater than 1.",
"3.1.0"),
MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT("manager.startup.tserver.avail.min.count", "0",
PropertyType.COUNT,
"Minimum number of tservers that need to be registered before manager will "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public LockID(String root, String serializedLID) {
if (lastSlash == 0) {
path = root;
} else {
path = root + "/" + sa[0].substring(0, lastSlash);
path = root;
if (!sa[0].startsWith("/")) {
path += "/";
}
Comment on lines +76 to +78
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this because for some reason sa[0] starts with a / for the MANAGERS locks.

path += sa[0].substring(0, lastSlash);
}
node = sa[0].substring(lastSlash + 1);
eid = new BigInteger(sa[1], 16).longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

public class ClusterServerConfiguration {

private int numManagers = 1;
private final Map<String,Integer> compactors;
private final Map<String,Integer> sservers;
private final Map<String,Integer> tservers;
Expand All @@ -36,7 +37,7 @@ public class ClusterServerConfiguration {
* in the default resource group
*/
public ClusterServerConfiguration() {
this(1, 1, 2);
this(1, 1, 1, 2);
}

/**
Expand All @@ -46,7 +47,9 @@ public ClusterServerConfiguration() {
* @param numSServers number of scan servers in the default resource group
* @param numTServers number of tablet servers in the default resource group
*/
public ClusterServerConfiguration(int numCompactors, int numSServers, int numTServers) {
public ClusterServerConfiguration(int numManagers, int numCompactors, int numSServers,
int numTServers) {
this.numManagers = numManagers;
compactors = new HashMap<>();
compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors);
sservers = new HashMap<>();
Expand All @@ -55,6 +58,14 @@ public ClusterServerConfiguration(int numCompactors, int numSServers, int numTSe
tservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numTServers);
}

public int getNumManagers() {
return this.numManagers;
}

public void setNumManagers(int num) {
this.numManagers = num;
}

public void setNumDefaultCompactors(int numCompactors) {
compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class MiniAccumuloClusterControl implements ClusterControl {
protected MiniAccumuloClusterImpl cluster;

Process zooKeeperProcess = null;
Process managerProcess = null;
final List<Process> managerProcesses = new ArrayList<>();
Process gcProcess = null;
Process monitor = null;
final Map<String,List<Process>> tabletServerProcesses = new HashMap<>();
Expand Down Expand Up @@ -189,8 +189,14 @@ public synchronized void start(ServerType server, Map<String,String> configOverr
}
break;
case MANAGER:
if (managerProcess == null) {
managerProcess = cluster._exec(classToUse, server, configOverrides).getProcess();
synchronized (managerProcesses) {
int count = 0;
for (int i = managerProcesses.size();
count < limit
&& i < cluster.getConfig().getClusterServerConfiguration().getNumManagers();
i++, ++count) {
managerProcesses.add(cluster._exec(classToUse, server, configOverrides).getProcess());
}
}
break;
case ZOOKEEPER:
Expand Down Expand Up @@ -258,15 +264,19 @@ public void stop(ServerType server) throws IOException {
public synchronized void stop(ServerType server, String hostname) throws IOException {
switch (server) {
case MANAGER:
if (managerProcess != null) {
synchronized (managerProcesses) {
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
for (Process manager : managerProcesses) {
try {
cluster.stopProcessWithTimeout(manager, 30, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} finally {
managerProcess = null;
managerProcesses.clear();
}
}
break;
Expand Down Expand Up @@ -392,14 +402,19 @@ public void killProcess(ServerType type, ProcessReference procRef)
boolean found = false;
switch (type) {
case MANAGER:
if (procRef.getProcess().equals(managerProcess)) {
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
synchronized (managerProcesses) {
for (Process manager : managerProcesses) {
if (procRef.getProcess().equals(manager)) {
managerProcesses.remove(manager);
try {
cluster.stopProcessWithTimeout(manager, 30, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
}
found = true;
break;
}
}
managerProcess = null;
found = true;
}
break;
case TABLET_SERVER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,12 @@ private void verifyUp() throws InterruptedException, IOException {

int numTries = 10;

requireNonNull(getClusterControl().managerProcess, "Error starting Manager - no process");
waitForProcessStart(getClusterControl().managerProcess, "Manager");
int mgrExpectedCount = 0;
for (Process tsp : getClusterControl().managerProcesses) {
mgrExpectedCount++;
requireNonNull(tsp, "Error starting Manager " + mgrExpectedCount + " - no process");
waitForProcessStart(tsp, "Manager" + mgrExpectedCount);
}

requireNonNull(getClusterControl().gcProcess, "Error starting GC - no process");
waitForProcessStart(getClusterControl().gcProcess, "GC");
Expand Down Expand Up @@ -875,7 +879,7 @@ List<ProcessReference> references(Process... procs) {
public Map<ServerType,Collection<ProcessReference>> getProcesses() {
Map<ServerType,Collection<ProcessReference>> result = new HashMap<>();
MiniAccumuloClusterControl control = getClusterControl();
result.put(ServerType.MANAGER, references(control.managerProcess));
result.put(ServerType.MANAGER, references(control.managerProcesses.toArray(new Process[0])));
result.put(ServerType.TABLET_SERVER, references(control.tabletServerProcesses.values().stream()
.flatMap(List::stream).collect(Collectors.toList()).toArray(new Process[0])));
result.put(ServerType.COMPACTOR, references(control.compactorProcesses.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public HostAndPort getAddress() {
}

private String lockString(ServiceLock mlock) {
return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK);
return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGERS);
}

private void loadTablet(TabletManagementClientService.Client client, ServiceLock lock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public ClosableIterator<TabletManagement> iterator(List<Range> ranges,
return wrapped.iterator(ranges, parameters);
}

@Override
@Deprecated
public void overrideRanges(List<Range> ranges) {
wrapped.overrideRanges(ranges);
}

@Override
public void setFutureLocations(Collection<Assignment> assignments)
throws DistributedStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.server.manager.state;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import org.apache.accumulo.core.clientImpl.ClientContext;
Expand All @@ -28,6 +29,7 @@
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;

import com.google.common.base.Preconditions;
Expand All @@ -38,13 +40,15 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState
private final String targetTableName;
private final Ample ample;
private final DataLevel level;
private List<Range> ranges;

protected MetaDataStateStore(DataLevel level, ClientContext context, String targetTableName) {
super(context);
this.level = level;
this.context = context;
this.ample = context.getAmple();
this.targetTableName = targetTableName;
this.ranges = Collections.singletonList(TabletsSection.getRange());
}

MetaDataStateStore(DataLevel level, ClientContext context) {
Expand All @@ -56,6 +60,12 @@ public DataLevel getLevel() {
return level;
}

@Override
@Deprecated
public void overrideRanges(List<Range> ranges) {
this.ranges = ranges;
}

@Override
public ClosableIterator<TabletManagement> iterator(List<Range> ranges,
TabletManagementParameters parameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,10 @@ public ClosableIterator<TabletManagement> iterator(List<Range> ranges,
public String name() {
return "Metadata Tablets";
}

@Override
public DataLevel getLevel() {
return DataLevel.METADATA;
}

}
Loading