Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,14 @@ public static void main(String[] args) {
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
try {
service.disposeOptimizingService();
} catch (Exception e) {
LOG.warn("AMS dispose error", e);
} finally {
// if HA enabled, make sure the dispose complete signal is sent to ZK
service.signalDisposeComplete();
}
}
}
} catch (Throwable t) {
Expand All @@ -153,6 +160,10 @@ public void waitFollowerShip() throws Exception {
haContainer.waitFollowerShip();
}

public void signalDisposeComplete() {
haContainer.signalDisposeComplete();
}

public void startRestServices() throws Exception {
EventsManager.getInstance();
MetricManager.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.amoro.client.AmsServerInfo;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.properties.AmsHAProperties;
import org.apache.amoro.shade.thrift.org.apache.commons.lang3.StringUtils;
import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.transaction.CuratorOp;
Expand All @@ -45,6 +46,8 @@ public class HighAvailabilityContainer implements LeaderLatchListener {
private final CuratorFramework zkClient;
private final String tableServiceMasterPath;
private final String optimizingServiceMasterPath;
// path to signal that this node has completed ams dispose
private final String disposeCompletePath;
private final AmsServerInfo tableServiceServerInfo;
private final AmsServerInfo optimizingServiceServerInfo;
private volatile CountDownLatch followerLatch;
Expand All @@ -59,6 +62,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception
String haClusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(haClusterName);
optimizingServiceMasterPath = AmsHAProperties.getOptimizingServiceMasterPath(haClusterName);
disposeCompletePath = AmsHAProperties.getMasterReleaseConfirmPath(haClusterName);
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
this.zkClient =
CuratorFrameworkFactory.builder()
Expand Down Expand Up @@ -90,6 +94,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception
zkClient = null;
tableServiceMasterPath = null;
optimizingServiceMasterPath = null;
disposeCompletePath = null;
tableServiceServerInfo = null;
optimizingServiceServerInfo = null;
// block follower latch forever when ha is disabled
Expand All @@ -102,6 +107,8 @@ public void waitLeaderShip() throws Exception {
if (leaderLatch != null) {
leaderLatch.await();
if (leaderLatch.hasLeadership()) {
waitPreviousLeaderDisposeComplete();

CuratorOp tableServiceMasterPathOp =
zkClient
.transactionOp()
Expand Down Expand Up @@ -134,6 +141,83 @@ public void waitFollowerShip() throws Exception {
LOG.info("Became the follower of AMS");
}

public void waitPreviousLeaderDisposeComplete() throws Exception {
// 1、Create the path if it does not exist, to ensure it exists for future primary and standby
// node switchover.
if (zkClient.checkExists().forPath(disposeCompletePath) == null) {
createPathIfNeeded(disposeCompletePath);
}

// 2、Determine if there is a previous leader, or if it is different from current node.
boolean hasPreviousOtherLeader = false;
try {
byte[] masterData = zkClient.getData().forPath(tableServiceMasterPath);
if (masterData != null && masterData.length > 0) {
String masterInfoInZkNode = new String(masterData, StandardCharsets.UTF_8);
if (!StringUtils.isEmpty(masterInfoInZkNode)) {
try {
// If data cannot be parsed correctly, it indicates that the AMS service is starting for
// the first time.
AmsServerInfo previousLeaderInfo =
JacksonUtil.parseObject(masterInfoInZkNode, AmsServerInfo.class);
if (previousLeaderInfo != null) {
// If parsing succeeds, check if it's different from current node
String currentInfoStr = JacksonUtil.toJSONString(tableServiceServerInfo);
LOG.debug(
"Current node info JSON: {}, ZK node info JSON: {}",
currentInfoStr,
masterInfoInZkNode);
if (!masterInfoInZkNode.equals(currentInfoStr)) {
hasPreviousOtherLeader = true;
} else {
LOG.debug(
"Previous leader is the same as current node (self-restart)."
+ " No need to wait for dispose signal.");
}
}
} catch (Exception e) {
LOG.warn(
"Failed to parse master info from ZooKeeper: {}, treating as no previous leader",
masterInfoInZkNode,
e);
// If parsing fails, treat as no previous leader
}
}
}
} catch (KeeperException.NoNodeException e) {
// No previous leader node found, indicating that this is the first startup of ams.
LOG.debug("No previous leader node found, indicating that this is the first startup of ams.");
}

if (!hasPreviousOtherLeader) {
LOG.debug("No previous other master detected, start service immediately.");
return;
}

// If disposeCompletePath exists, the following scenarios may occur:
// 1) A primary-standby node switchover occurs, and the former primary
// node has not completed the AMS dispose operation.
// 2) The previous primary node is unreachable due to network issues.
// 3) No primary-standby node switchover occurred, but ZK retains
// information about the previous primary node.
long startTime = System.currentTimeMillis();
int maxWaitTime = 30000; // 30s
while (System.currentTimeMillis() - startTime <= maxWaitTime) {
// At this point, the disposeCompletePath does not exist,
// indicating that the previous master node has completed
// the AMS service shutdown operation and deleted the path.
if (zkClient.checkExists().forPath(disposeCompletePath) == null) {
LOG.info("Previous leader has completed dispose. Proceeding.");
return;
}
}

LOG.debug(
"Timeout ({}ms) waiting for previous other leader to signal dispose complete. Proceeding anyway. "
+ "This might indicate the previous leader is unresponsive.",
maxWaitTime);
}

public void close() {
if (leaderLatch != null) {
try {
Expand Down Expand Up @@ -163,6 +247,27 @@ public void notLeader() {
followerLatch.countDown();
}

/**
* In HA mode, when the AMS service is stopped, delete the existing `disposeCompletePath` file
* from ZK to indicate that the AMS service has been terminated.
*/
public void signalDisposeComplete() {
// when HA is disabled, do nothing
if (zkClient == null) {
return;
}

try {
if (zkClient.checkExists().forPath(disposeCompletePath) != null) {
zkClient.delete().forPath(disposeCompletePath);
return;
}
LOG.debug("ams dispose complete signal written.");
} catch (Exception e) {
LOG.warn("Failed to write dispose complete signal", e);
}
}

private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restBindPort) {
AmsServerInfo amsServerInfo = new AmsServerInfo();
amsServerInfo.setHost(host);
Expand Down
Loading