Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,22 @@ Map<String, Pair<Boolean, String>> getSafeModeRuleStatuses()
*/
boolean forceExitSafeMode() throws IOException;

/**
* Check if a specific SCM node is in safe mode.
* @param nodeId SCM node ID to query
* @return true if the node is in safe mode, false otherwise
* @throws IOException
*/
boolean inSafeModeForNode(String nodeId) throws IOException;

/**
* Get safe mode rule statuses from a specific SCM node.
* @param nodeId SCM node ID to query
* @return Map of rule name to rule status
* @throws IOException
*/
Map<String, Pair<Boolean, String>> getSafeModeRuleStatusesForNode(String nodeId) throws IOException;

/**
* Start ReplicationManager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public interface StorageContainerLocationProtocol extends Closeable {
Type.StopReplicationManager,
Type.ForceExitSafeMode));

/**
* Read-only commands that can execute on followers without leader check.
* These commands respect the --scm parameter and query the specified SCM.
*/
Set<Type> FOLLOWER_READABLE_COMMAND_TYPE = Collections.unmodifiableSet(EnumSet.of(
Type.InSafeMode,
Type.GetSafeModeRuleStatuses));

/**
* Asks SCM where a container should be allocated. SCM responds with the
* set of datanodes that should be used creating this container.
Expand Down Expand Up @@ -390,6 +398,26 @@ Map<String, Pair<Boolean, String>> getSafeModeRuleStatuses()
*/
boolean forceExitSafeMode() throws IOException;

/**
* Check if a specific SCM node is in safe mode.
* In HA clusters, queries the specified node.
*
* @param nodeId SCM node ID to query
* @return true if the node is in safe mode, false otherwise
* @throws IOException
*/
boolean inSafeModeForNode(String nodeId) throws IOException;

/**
* Get safe mode rule statuses from a specific SCM node.
* In HA clusters, queries the specified node.
*
* @param nodeId SCM node ID to query
* @return Map of rule name to rule status
* @throws IOException
*/
Map<String, Pair<Boolean, String>> getSafeModeRuleStatusesForNode(String nodeId) throws IOException;

/**
* Start ReplicationManager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@
import org.apache.hadoop.ozone.upgrade.UpgradeFinalization.StatusAndMessages;
import org.apache.hadoop.ozone.util.ProtobufUtils;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is the client-side translator to translate the requests made on
Expand All @@ -162,6 +164,8 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB

private final StorageContainerLocationProtocolPB rpcProxy;
private final SCMContainerLocationFailoverProxyProvider fpp;
private static final Logger LOG =
LoggerFactory.getLogger(StorageContainerLocationProtocolClientSideTranslatorPB.class);

/**
* Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
Expand Down Expand Up @@ -200,6 +204,34 @@ private ScmContainerLocationResponse submitRequest(
return response;
}

/**
* Helper method to wrap the request and send the message to a specific SCM node.
* This is used for operations that need to query a specific SCM node in an HA cluster.
*
* @param nodeId the SCM node ID to send the request to
* @param type the request type
* @param builderConsumer consumer to populate the request specific fields
* @return the response from the specified SCM node
*/
private ScmContainerLocationResponse submitRequestToNode(
String nodeId,
StorageContainerLocationProtocolProtos.Type type,
Consumer<Builder> builderConsumer) throws IOException {
try {
StorageContainerLocationProtocolPB proxy = fpp.getProxyForNode(nodeId);
Builder builder = ScmContainerLocationRequest.newBuilder()
.setCmdType(type)
.setVersion(ClientVersion.CURRENT_VERSION)
.setTraceID(TracingUtil.exportCurrentSpan());
builderConsumer.accept(builder);
ScmContainerLocationRequest wrapper = builder.build();

return proxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
} catch (ServiceException ex) {
throw ProtobufHelper.getRemoteException(ex);
}
}

private ScmContainerLocationResponse submitRpcRequest(
ScmContainerLocationRequest wrapper) throws ServiceException {
if (!ADMIN_COMMAND_TYPE.contains(wrapper.getCmdType())) {
Expand Down Expand Up @@ -843,13 +875,21 @@ public Map<String, Pair<Boolean, String>> getSafeModeRuleStatuses()
submitRequest(Type.GetSafeModeRuleStatuses,
builder -> builder.setGetSafeModeRuleStatusesRequest(request))
.getGetSafeModeRuleStatusesResponse();
Map<String, Pair<Boolean, String>> map = new HashMap();
for (SafeModeRuleStatusProto statusProto :
response.getSafeModeRuleStatusesProtoList()) {
map.put(statusProto.getRuleName(),
return buildSafeModeRuleStatusesMap(response);
}

/**
* Helper method to build a map from GetSafeModeRuleStatusesResponseProto.
* Extracts rule names and their status information.
*/
private Map<String, Pair<Boolean, String>> buildSafeModeRuleStatusesMap(
GetSafeModeRuleStatusesResponseProto response) {
Map<String, Pair<Boolean, String>> ruleStatuses = new HashMap<>();
for (SafeModeRuleStatusProto statusProto : response.getSafeModeRuleStatusesProtoList()) {
ruleStatuses.put(statusProto.getRuleName(),
Pair.of(statusProto.getValidate(), statusProto.getStatusText()));
}
return map;
return ruleStatuses;
}

/**
Expand All @@ -870,6 +910,24 @@ public boolean forceExitSafeMode() throws IOException {

}

@Override
public boolean inSafeModeForNode(String nodeId) throws IOException {
InSafeModeRequestProto request = InSafeModeRequestProto.getDefaultInstance();
return submitRequestToNode(nodeId, Type.InSafeMode,
builder -> builder.setInSafeModeRequest(request))
.getInSafeModeResponse().getInSafeMode();
}

@Override
public Map<String, Pair<Boolean, String>> getSafeModeRuleStatusesForNode(String nodeId) throws IOException {
GetSafeModeRuleStatusesRequestProto request = GetSafeModeRuleStatusesRequestProto.getDefaultInstance();
GetSafeModeRuleStatusesResponseProto response =
submitRequestToNode(nodeId, Type.GetSafeModeRuleStatuses,
builder -> builder.setGetSafeModeRuleStatusesRequest(request))
.getGetSafeModeRuleStatusesResponse();
return buildSafeModeRuleStatusesMap(response);
}

@Override
public void startReplicationManager() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ public synchronized List<T> getProxies() {
.map(proxyInfo -> proxyInfo.proxy).collect(Collectors.toList());
}

public synchronized T getProxyForNode(String nodeId) throws IOException {
ProxyInfo<T> proxyInfo = scmProxies.get(nodeId);
if (proxyInfo == null) {
if (!scmProxyInfoMap.containsKey(nodeId)) {
throw new IOException("Unknown SCM node ID: " + nodeId);
}
proxyInfo = createSCMProxy(nodeId);
}
return proxyInfo.proxy;
}

@Override
public synchronized void performFailover(T newLeader) {
if (updatedLeaderNodeID != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type.ListContainer;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type.ListPipelines;
import static org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol.ADMIN_COMMAND_TYPE;
import static org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol.FOLLOWER_READABLE_COMMAND_TYPE;

import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.RpcController;
Expand Down Expand Up @@ -210,9 +211,12 @@ public StorageContainerLocationProtocolServerSideTranslatorPB(
@Override
public ScmContainerLocationResponse submitRequest(RpcController controller,
ScmContainerLocationRequest request) throws ServiceException {
// not leader or not belong to admin command.
// Trigger not leader exception unless:
// This is the leader node, or this is an admin command,
// or this is a follower-readable command.
if (!scm.checkLeader()
&& !ADMIN_COMMAND_TYPE.contains(request.getCmdType())) {
&& !ADMIN_COMMAND_TYPE.contains(request.getCmdType())
&& !FOLLOWER_READABLE_COMMAND_TYPE.contains(request.getCmdType())) {
RatisUtil.checkRatisException(
scm.getScmHAManager().getRatisServer().triggerNotLeaderException(),
scm.getClientRpcPort(), scm.getScmId(), scm.getHostname(), ROLE_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,42 @@ public Map<String, Pair<Boolean, String>> getSafeModeRuleStatuses()
}
}

@Override
public boolean inSafeModeForNode(String nodeId) throws IOException {
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("nodeId", nodeId);
try {
boolean result = inSafeMode();
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.IN_SAFE_MODE, auditMap)
);
return result;
} catch (Exception ex) {
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.IN_SAFE_MODE, auditMap, ex)
);
throw ex;
}
}

@Override
public Map<String, Pair<Boolean, String>> getSafeModeRuleStatusesForNode(String nodeId) throws IOException {
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("nodeId", nodeId);
try {
Map<String, Pair<Boolean, String>> result = getSafeModeRuleStatuses();
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.GET_SAFE_MODE_RULE_STATUSES, auditMap)
);
return result;
} catch (Exception ex) {
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.GET_SAFE_MODE_RULE_STATUSES, auditMap, ex)
);
throw ex;
}
}

/**
* Force SCM out of Safe mode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,16 @@ public boolean forceExitSafeMode() throws IOException {
return storageContainerLocationClient.forceExitSafeMode();
}

@Override
public boolean inSafeModeForNode(String nodeId) throws IOException {
return storageContainerLocationClient.inSafeModeForNode(nodeId);
}

@Override
public Map<String, Pair<Boolean, String>> getSafeModeRuleStatusesForNode(String nodeId) throws IOException {
return storageContainerLocationClient.getSafeModeRuleStatusesForNode(nodeId);
}

@Override
public void startReplicationManager() throws IOException {
storageContainerLocationClient.startReplicationManager();
Expand Down
Loading