From 69132f2c08b49f00317b75e4c748d89e3c58bd12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=87=E9=A2=86?= Date: Wed, 24 Sep 2025 17:15:03 +0800 Subject: [PATCH] Fixed an issue where, when enabling HA in Amoro and switching between primary and standby nodes, the data loaded by the primary node was inconsistent with the database. --- .../amoro/server/AmoroServiceContainer.java | 13 +- .../server/HighAvailabilityContainer.java | 105 ++++++ .../server/TestHighAvailabilityContainer.java | 324 ++++++++++++++++++ .../amoro/properties/AmsHAProperties.java | 5 + 4 files changed, 446 insertions(+), 1 deletion(-) create mode 100644 amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 1f91119fc1..d28b27c08a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -136,7 +136,14 @@ public static void main(String[] args) { } catch (Exception e) { LOG.error("AMS start error", e); } finally { - service.disposeOptimizingService(); + try { + service.disposeOptimizingService(); + } catch (Exception e) { + LOG.warn("AMS dispose error", e); + } finally { + // if HA enabled, make sure the dispose complete signal is sent to ZK + service.signalDisposeComplete(); + } } } } catch (Throwable t) { @@ -153,6 +160,10 @@ public void waitFollowerShip() throws Exception { haContainer.waitFollowerShip(); } + public void signalDisposeComplete() { + haContainer.signalDisposeComplete(); + } + public void startRestServices() throws Exception { EventsManager.getInstance(); MetricManager.getInstance(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java index 6d15d37356..cf29f4ba88 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/HighAvailabilityContainer.java @@ -21,6 +21,7 @@ import org.apache.amoro.client.AmsServerInfo; import org.apache.amoro.config.Configurations; import org.apache.amoro.properties.AmsHAProperties; +import org.apache.amoro.shade.thrift.org.apache.commons.lang3.StringUtils; import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework; import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.transaction.CuratorOp; @@ -45,6 +46,8 @@ public class HighAvailabilityContainer implements LeaderLatchListener { private final CuratorFramework zkClient; private final String tableServiceMasterPath; private final String optimizingServiceMasterPath; + // path to signal that this node has completed ams dispose + private final String disposeCompletePath; private final AmsServerInfo tableServiceServerInfo; private final AmsServerInfo optimizingServiceServerInfo; private volatile CountDownLatch followerLatch; @@ -59,6 +62,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception String haClusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME); tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(haClusterName); optimizingServiceMasterPath = AmsHAProperties.getOptimizingServiceMasterPath(haClusterName); + disposeCompletePath = AmsHAProperties.getMasterReleaseConfirmPath(haClusterName); ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000); this.zkClient = CuratorFrameworkFactory.builder() @@ -90,6 +94,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception zkClient = null; tableServiceMasterPath = null; optimizingServiceMasterPath = null; + disposeCompletePath = null; tableServiceServerInfo = null; optimizingServiceServerInfo = null; // block follower latch forever when ha is disabled @@ -102,6 +107,8 @@ public void waitLeaderShip() throws Exception { if (leaderLatch != null) { leaderLatch.await(); if (leaderLatch.hasLeadership()) { + waitPreviousLeaderDisposeComplete(); + CuratorOp tableServiceMasterPathOp = zkClient .transactionOp() @@ -134,6 +141,83 @@ public void waitFollowerShip() throws Exception { LOG.info("Became the follower of AMS"); } + public void waitPreviousLeaderDisposeComplete() throws Exception { + // 1、Create the path if it does not exist, to ensure it exists for future primary and standby + // node switchover. + if (zkClient.checkExists().forPath(disposeCompletePath) == null) { + createPathIfNeeded(disposeCompletePath); + } + + // 2、Determine if there is a previous leader, or if it is different from current node. + boolean hasPreviousOtherLeader = false; + try { + byte[] masterData = zkClient.getData().forPath(tableServiceMasterPath); + if (masterData != null && masterData.length > 0) { + String masterInfoInZkNode = new String(masterData, StandardCharsets.UTF_8); + if (!StringUtils.isEmpty(masterInfoInZkNode)) { + try { + // If data cannot be parsed correctly, it indicates that the AMS service is starting for + // the first time. + AmsServerInfo previousLeaderInfo = + JacksonUtil.parseObject(masterInfoInZkNode, AmsServerInfo.class); + if (previousLeaderInfo != null) { + // If parsing succeeds, check if it's different from current node + String currentInfoStr = JacksonUtil.toJSONString(tableServiceServerInfo); + LOG.debug( + "Current node info JSON: {}, ZK node info JSON: {}", + currentInfoStr, + masterInfoInZkNode); + if (!masterInfoInZkNode.equals(currentInfoStr)) { + hasPreviousOtherLeader = true; + } else { + LOG.debug( + "Previous leader is the same as current node (self-restart)." + + " No need to wait for dispose signal."); + } + } + } catch (Exception e) { + LOG.warn( + "Failed to parse master info from ZooKeeper: {}, treating as no previous leader", + masterInfoInZkNode, + e); + // If parsing fails, treat as no previous leader + } + } + } + } catch (KeeperException.NoNodeException e) { + // No previous leader node found, indicating that this is the first startup of ams. + LOG.debug("No previous leader node found, indicating that this is the first startup of ams."); + } + + if (!hasPreviousOtherLeader) { + LOG.debug("No previous other master detected, start service immediately."); + return; + } + + // If disposeCompletePath exists, the following scenarios may occur: + // 1) A primary-standby node switchover occurs, and the former primary + // node has not completed the AMS dispose operation. + // 2) The previous primary node is unreachable due to network issues. + // 3) No primary-standby node switchover occurred, but ZK retains + // information about the previous primary node. + long startTime = System.currentTimeMillis(); + int maxWaitTime = 30000; // 30s + while (System.currentTimeMillis() - startTime <= maxWaitTime) { + // At this point, the disposeCompletePath does not exist, + // indicating that the previous master node has completed + // the AMS service shutdown operation and deleted the path. + if (zkClient.checkExists().forPath(disposeCompletePath) == null) { + LOG.info("Previous leader has completed dispose. Proceeding."); + return; + } + } + + LOG.debug( + "Timeout ({}ms) waiting for previous other leader to signal dispose complete. Proceeding anyway. " + + "This might indicate the previous leader is unresponsive.", + maxWaitTime); + } + public void close() { if (leaderLatch != null) { try { @@ -163,6 +247,27 @@ public void notLeader() { followerLatch.countDown(); } + /** + * In HA mode, when the AMS service is stopped, delete the existing `disposeCompletePath` file + * from ZK to indicate that the AMS service has been terminated. + */ + public void signalDisposeComplete() { + // when HA is disabled, do nothing + if (zkClient == null) { + return; + } + + try { + if (zkClient.checkExists().forPath(disposeCompletePath) != null) { + zkClient.delete().forPath(disposeCompletePath); + return; + } + LOG.debug("ams dispose complete signal written."); + } catch (Exception e) { + LOG.warn("Failed to write dispose complete signal", e); + } + } + private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restBindPort) { AmsServerInfo amsServerInfo = new AmsServerInfo(); amsServerInfo.setHost(host); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java new file mode 100644 index 0000000000..1152f401c3 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server; + +import static org.apache.amoro.server.AmoroManagementConf.HA_CLUSTER_NAME; +import static org.apache.amoro.server.AmoroManagementConf.HA_ENABLE; +import static org.apache.amoro.server.AmoroManagementConf.HA_ZOOKEEPER_ADDRESS; +import static org.apache.amoro.server.AmoroManagementConf.SERVER_EXPOSE_HOST; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.amoro.client.AmsServerInfo; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.properties.AmsHAProperties; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode; +import org.apache.amoro.utils.JacksonUtil; +import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class TestHighAvailabilityContainer { + + private TestingServer zkTestServer; + private CuratorFramework zkClient; + private String clusterName; + private String zkConnectionString; + + @BeforeEach + public void setup() throws Exception { + // Start embedded ZooKeeper test server + int port = (int) (Math.random() * 4000) + 14000; + zkTestServer = new TestingServer(port, true); + zkTestServer.start(); + // Wait for server to start + zkTestServer.restart(); + zkConnectionString = "127.0.0.1:" + port; + clusterName = "test-cluster"; + + // Create ZooKeeper client + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); + zkClient = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy); + zkClient.start(); + // Wait for connection + if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) { + throw new RuntimeException("Failed to connect to ZooKeeper server"); + } + + // Clean up existing nodes + String basePath = "/" + clusterName + "/arctic/ams"; + try { + if (zkClient.checkExists().forPath(basePath) != null) { + zkClient.delete().deletingChildrenIfNeeded().forPath(basePath); + } + } catch (Exception e) { + // Ignore deletion exceptions + } + } + + @AfterEach + public void teardown() { + if (zkClient != null) { + try { + zkClient.close(); + } catch (Exception e) { + // Ignore close exceptions + } + } + if (zkTestServer != null) { + try { + zkTestServer.stop(); + } catch (Exception e) { + // Ignore close exceptions + } + } + } + + private Configurations createHAConfig() { + Configurations config = new Configurations(); + config.set(HA_ENABLE, true); + config.set(HA_ZOOKEEPER_ADDRESS, zkConnectionString); + config.set(HA_CLUSTER_NAME, clusterName); + config.set(SERVER_EXPOSE_HOST, "localhost"); + // Use existing configuration items + config.set(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT, 12345); + config.set(AmoroManagementConf.HTTP_SERVER_PORT, 19090); + config.set(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT, 12346); + return config; + } + + private void cleanupPath(String path) { + try { + if (zkClient.checkExists().forPath(path) != null) { + // If path exists, clear its data + zkClient.setData().forPath(path, new byte[0]); + } + } catch (Exception e) { + // Ignore deletion exceptions + } + } + + /** + * create previous leader info in ZooKeeper + * + * @param host + * @param thriftPort + * @param restPort + * @throws Exception + */ + private void setupPreviousLeaderInfo(String host, int thriftPort, int restPort) throws Exception { + // Set up "previous leader" information that differs from current node + String tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(clusterName); + String optimizingServiceMasterPath = + AmsHAProperties.getOptimizingServiceMasterPath(clusterName); + + AmsServerInfo serverInfo = new AmsServerInfo(); + serverInfo.setHost(host); + serverInfo.setThriftBindPort(thriftPort); + serverInfo.setRestBindPort(restPort); + String serverInfoJson = JacksonUtil.toJSONString(serverInfo); + + try { + zkClient + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(tableServiceMasterPath, serverInfoJson.getBytes(StandardCharsets.UTF_8)); + zkClient + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(optimizingServiceMasterPath, serverInfoJson.getBytes(StandardCharsets.UTF_8)); + } catch ( + org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException.NodeExistsException + e) { + zkClient + .setData() + .forPath(tableServiceMasterPath, serverInfoJson.getBytes(StandardCharsets.UTF_8)); + zkClient + .setData() + .forPath(optimizingServiceMasterPath, serverInfoJson.getBytes(StandardCharsets.UTF_8)); + } + } + + private void executeAndWaitForLeadership( + HighAvailabilityContainer haContainer, + AtomicBoolean hasLeadership, + AtomicReference exceptionRef) + throws InterruptedException { + // Execute waitLeaderShip in another thread and wait for result + CountDownLatch leadershipLatch = new CountDownLatch(1); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit( + () -> { + try { + haContainer.waitLeaderShip(); + hasLeadership.set(true); + leadershipLatch.countDown(); + } catch (Exception e) { + exceptionRef.set(e); + } + }); + + boolean success = leadershipLatch.await(30, TimeUnit.SECONDS); + // Check for exceptions + Exception thrownException = exceptionRef.get(); + if (thrownException != null) { + throw new AssertionError( + "Exception occurred during waitLeaderShip: " + thrownException.getMessage(), + thrownException); + } + + assertTrue(success, "Should acquire leadership within timeout"); + assertTrue(hasLeadership.get(), "Should have acquired leadership"); + executor.shutdown(); + } + + @Test + public void testWaitLeadershipWithNoPreviousLeader() throws Exception { + Configurations config = createHAConfig(); + + // Ensure nodes are cleaned before test starts to avoid using previous test data + String tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(clusterName); + cleanupPath(tableServiceMasterPath); + + HighAvailabilityContainer haContainer = new HighAvailabilityContainer(config); + + // Simulate waiting for leadership in another thread + AtomicBoolean hasLeadership = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + executeAndWaitForLeadership(haContainer, hasLeadership, exceptionRef); + + haContainer.close(); + } + + @Test + public void testWaitLeadershipWithSamePreviousLeaderInfo() throws Exception { + Configurations config = createHAConfig(); + HighAvailabilityContainer haContainer = new HighAvailabilityContainer(config); + + // Set up "previous leader" info that matches current node (simulate restart scenario) + setupPreviousLeaderInfo("localhost", 12345, 19090); + + // Simulate waiting for leadership in another thread + AtomicBoolean hasLeadership = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + executeAndWaitForLeadership(haContainer, hasLeadership, exceptionRef); + + haContainer.close(); + } + + @Test + public void testWaitLeadershipTimeoutWaitingForPreviousLeader() throws Exception { + Configurations config = createHAConfig(); + HighAvailabilityContainer haContainer = new HighAvailabilityContainer(config); + + // Set up different "previous leader" info without creating disposeCompletePath (simulate + // unresponsive previous leader) + setupPreviousLeaderInfo("other-host", 12345, 19090); + + // Simulate waiting for leadership in another thread + CountDownLatch leadershipLatch = new CountDownLatch(1); + AtomicBoolean hasLeadership = new AtomicBoolean(false); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit( + () -> { + try { + haContainer.waitLeaderShip(); + hasLeadership.set(true); + leadershipLatch.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Wait for leadership (should take over after timeout) + boolean success = + leadershipLatch.await(40, TimeUnit.SECONDS); // Wait beyond default 30s timeout + assertTrue( + success, + "Should acquire leadership after timeout even if previous leader does not respond"); + assertTrue(hasLeadership.get(), "Should have acquired leadership"); + + haContainer.close(); + executor.shutdown(); + } + + @Test + public void testSignalDisposeComplete() throws Exception { + Configurations config = createHAConfig(); + HighAvailabilityContainer haContainer = new HighAvailabilityContainer(config); + + String disposeCompletePath = AmsHAProperties.getMasterReleaseConfirmPath(clusterName); + + // Create node + try { + zkClient + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(disposeCompletePath); + } catch ( + org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException.NodeExistsException + e) { + // Ignore if node already exists + } + + // Ensure node exists + assertNotNull(zkClient.checkExists().forPath(disposeCompletePath)); + + // Call signal complete method + haContainer.signalDisposeComplete(); + + // Verify node has been deleted + assertNull(zkClient.checkExists().forPath(disposeCompletePath)); + + haContainer.close(); + } + + @Test + public void testSignalDisposeCompleteWithNoPath() throws Exception { + Configurations config = createHAConfig(); + HighAvailabilityContainer haContainer = new HighAvailabilityContainer(config); + + String disposeCompletePath = AmsHAProperties.getMasterReleaseConfirmPath(clusterName); + + // Ensure node does not exist + assertNull(zkClient.checkExists().forPath(disposeCompletePath)); + + // Call signal complete method - should not throw exception + assertDoesNotThrow(() -> haContainer.signalDisposeComplete()); + + haContainer.close(); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java index e794b520da..f8038a51ca 100644 --- a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java +++ b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java @@ -26,6 +26,7 @@ public class AmsHAProperties { private static final String TABLE_SERVICE_MASTER_PATH = "/master"; private static final String OPTIMIZING_SERVICE_MASTER_PATH = "/optimizing-service-master"; private static final String NAMESPACE_DEFAULT = "default"; + private static final String MASTER_RELEASE_CONFIRM = "/master-release-confirm"; private static String getBasePath(String namespace) { if (Strings.isNullOrEmpty(namespace)) { @@ -42,6 +43,10 @@ public static String getOptimizingServiceMasterPath(String namespace) { return getBasePath(namespace) + OPTIMIZING_SERVICE_MASTER_PATH; } + public static String getMasterReleaseConfirmPath(String namespace) { + return getBasePath(namespace) + MASTER_RELEASE_CONFIRM; + } + public static String getLeaderPath(String namespace) { return getBasePath(namespace) + LEADER_PATH; }