From fec29639f24952a5f9f6cdd23ee633c8c45177e2 Mon Sep 17 00:00:00 2001 From: bitflicker64 Date: Thu, 5 Mar 2026 13:42:24 +0530 Subject: [PATCH 1/4] fix(pd): add timeout and null-safety to getLeaderGrpcAddress() The bolt RPC call in getLeaderGrpcAddress() returns null in Docker bridge network mode, causing NPE when a follower PD node attempts to discover the leader's gRPC address. This breaks store registration and partition distribution when any node other than pd0 wins the raft leader election. Add a bounded timeout using the configured rpc-timeout, null-check the RPC response, and fall back to deriving the address from the raft endpoint IP when the RPC fails. Closes apache/hugegraph#2959 --- .../apache/hugegraph/pd/raft/RaftEngine.java | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index b73364ae6d..2f7c01cb50 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -49,7 +50,6 @@ import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.option.RpcOptions; import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.rpc.RpcServer; @@ -86,8 +86,12 @@ public synchronized boolean init(PDConfig.Raft config) { } this.config = config; + // Wire configured rpc timeout into RaftRpcClient so the Bolt transport + // timeout and the future.get() caller timeout in getLeaderGrpcAddress() are consistent. raftRpcClient = new RaftRpcClient(); - raftRpcClient.init(new RpcOptions()); + RpcOptions rpcOptions = new RpcOptions(); + rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout()); + raftRpcClient.init(rpcOptions); String raftPath = config.getDataPath() + "/" + groupId; new File(raftPath).mkdirs(); @@ -119,8 +123,7 @@ public synchronized boolean init(PDConfig.Raft config) { nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout()); nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout()); nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout()); - // Set the raft configuration - RaftOptions raftOptions = nodeOptions.getRaftOptions(); + // TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine for reference) nodeOptions.setEnableMetrics(true); @@ -228,7 +231,7 @@ public PeerId getLeader() { } /** - * Send a message to the leader to get the grpc address; + * Send a message to the leader to get the grpc address. */ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedException { if (isLeader()) { @@ -236,11 +239,34 @@ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedExcep } if (raftNode.getLeaderId() == null) { - waitingForLeader(10000); + waitingForLeader(config.getRpcTimeout()); } - return raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get() - .getGrpcAddress(); + // Cache leader to avoid repeated getLeaderId() calls and guard against + // waitingForLeader() returning without a leader being elected. + PeerId leader = raftNode.getLeaderId(); + if (leader == null) { + throw new ExecutionException(new IllegalStateException("Leader is not ready")); + } + + try { + RaftRpcProcessor.GetMemberResponse response = raftRpcClient + .getGrpcAddress(leader.getEndpoint().toString()) + .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS); + if (response != null && response.getGrpcAddress() != null) { + return response.getGrpcAddress(); + } + } catch (TimeoutException | ExecutionException e) { + // TODO: a more complete fix would need a source of truth for the leader's + // actual grpcAddress rather than deriving it from the local node's port config. + throw new ExecutionException( + String.format("Failed to resolve leader gRPC address for %s", leader), e); + } + + log.warn("Leader gRPC address field is null in RPC response for {}", leader); + throw new ExecutionException( + new IllegalStateException( + String.format("Leader gRPC address unavailable for %s", leader))); } /** @@ -322,14 +348,7 @@ public Status changePeerList(String peerList) { newPeers.parse(peerList); CountDownLatch latch = new CountDownLatch(1); this.raftNode.changePeers(newPeers, status -> { - // Use compareAndSet so a late callback does not overwrite a timeout status result.compareAndSet(null, status); - // Refresh inside callback so it fires even if caller already timed out - // Note: changePeerList() uses Configuration.parse() which only supports - // plain comma-separated peer addresses with no learner syntax. - // getLearners() will always be empty here. Learner support is handled - // in PDService.updatePdRaft() which uses PeerUtil.parseConfig() - // and supports the /learner suffix. if (status != null && status.isOk()) { IpAuthHandler handler = IpAuthHandler.getInstance(); if (handler != null) { @@ -347,16 +366,12 @@ public Status changePeerList(String peerList) { } latch.countDown(); }); - // Use 3x configured RPC timeout — bare await() would block forever if - // the callback is never invoked (e.g. node not started / RPC failure) - boolean completed = latch.await(3L * config.getRpcTimeout(), - TimeUnit.MILLISECONDS); + boolean completed = latch.await(3L * config.getRpcTimeout(), TimeUnit.MILLISECONDS); if (!completed && result.get() == null) { Status timeoutStatus = new Status(RaftError.EINTERNAL, "changePeerList timed out after %d ms", 3L * config.getRpcTimeout()); if (!result.compareAndSet(null, timeoutStatus)) { - // Callback arrived just before us — keep its result timeoutStatus = null; } if (timeoutStatus != null) { @@ -395,7 +410,6 @@ public PeerId waitingForLeader(long timeOut) { } return leader; } - } public Node getRaftNode() { From 9a131917baa3ae89d40d21bbf1015ae1666b85d5 Mon Sep 17 00:00:00 2001 From: bitflicker64 Date: Thu, 12 Mar 2026 18:38:56 +0530 Subject: [PATCH 2/4] ci: trigger rerun From 8576da05376c37638f6c2410926320bb707958e5 Mon Sep 17 00:00:00 2001 From: bitflicker64 Date: Wed, 18 Mar 2026 00:34:55 +0530 Subject: [PATCH 3/4] fix: split ExecutionException catch and remove duplicate setEnableMetrics --- .../org/apache/hugegraph/pd/raft/RaftEngine.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index 2f7c01cb50..8e01e5bc43 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -125,8 +125,6 @@ public synchronized boolean init(PDConfig.Raft config) { nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout()); // TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine for reference) - nodeOptions.setEnableMetrics(true); - final PeerId serverId = JRaftUtils.getPeerId(config.getAddress()); rpcServer = createRaftRpcServer(config.getAddress(), initConf.getPeers()); @@ -256,11 +254,18 @@ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedExcep if (response != null && response.getGrpcAddress() != null) { return response.getGrpcAddress(); } - } catch (TimeoutException | ExecutionException e) { + } catch (TimeoutException e) { + // TODO: a more complete fix would need a source of truth for the leader's + // actual grpcAddress rather than deriving it from the local node's port config. + throw new ExecutionException( + String.format("Timed out while resolving leader gRPC address for %s", leader), + e); + } catch (ExecutionException e) { // TODO: a more complete fix would need a source of truth for the leader's // actual grpcAddress rather than deriving it from the local node's port config. + Throwable cause = e.getCause() != null ? e.getCause() : e; throw new ExecutionException( - String.format("Failed to resolve leader gRPC address for %s", leader), e); + String.format("Failed to resolve leader gRPC address for %s", leader), cause); } log.warn("Leader gRPC address field is null in RPC response for {}", leader); From 596bee18fcf328eb1a1a7e2c144c8aad8eecf8cc Mon Sep 17 00:00:00 2001 From: bitflicker64 Date: Thu, 19 Mar 2026 13:53:27 +0530 Subject: [PATCH 4/4] fix: restore best-effort fallback, improve logging and add unit tests for getLeaderGrpcAddress() --- .../apache/hugegraph/pd/raft/RaftEngine.java | 36 ++-- .../hugegraph/pd/core/PDCoreSuiteTest.java | 2 + .../pd/raft/RaftEngineLeaderAddressTest.java | 181 ++++++++++++++++++ 3 files changed, 205 insertions(+), 14 deletions(-) create mode 100644 hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index 8e01e5bc43..4b4fd7fe87 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -247,31 +247,39 @@ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedExcep throw new ExecutionException(new IllegalStateException("Leader is not ready")); } + RaftRpcProcessor.GetMemberResponse response = null; try { - RaftRpcProcessor.GetMemberResponse response = raftRpcClient + // TODO: a more complete fix would need a source of truth for the leader's + // actual grpcAddress rather than deriving it from the local node's port config. + response = raftRpcClient .getGrpcAddress(leader.getEndpoint().toString()) .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS); if (response != null && response.getGrpcAddress() != null) { return response.getGrpcAddress(); } + if (response == null) { + log.warn("Leader RPC response is null for {}, falling back to derived address", + leader); + } else { + log.warn("Leader gRPC address field is null in RPC response for {}, " + + "falling back to derived address", leader); + } } catch (TimeoutException e) { - // TODO: a more complete fix would need a source of truth for the leader's - // actual grpcAddress rather than deriving it from the local node's port config. - throw new ExecutionException( - String.format("Timed out while resolving leader gRPC address for %s", leader), - e); + log.warn("Timed out resolving leader gRPC address for {}, falling back to derived " + + "address", leader); } catch (ExecutionException e) { - // TODO: a more complete fix would need a source of truth for the leader's - // actual grpcAddress rather than deriving it from the local node's port config. Throwable cause = e.getCause() != null ? e.getCause() : e; - throw new ExecutionException( - String.format("Failed to resolve leader gRPC address for %s", leader), cause); + log.warn("Failed to resolve leader gRPC address for {}, falling back to derived " + + "address", leader, cause); } - log.warn("Leader gRPC address field is null in RPC response for {}", leader); - throw new ExecutionException( - new IllegalStateException( - String.format("Leader gRPC address unavailable for %s", leader))); + // Best-effort fallback: derive from leader raft endpoint IP + local gRPC port. + // WARNING: this may be incorrect in clusters where PD nodes use different grpc.port + // values , a proper fix requires a cluster-wide source of truth for gRPC addresses. + String derived = leader.getEndpoint().getIp() + ":" + config.getGrpcPort(); + log.warn("Using derived leader gRPC address {} — may be incorrect if nodes use different ports", + derived); + return derived; } /** diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java index 57fd367171..87d1500bcb 100644 --- a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java +++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java @@ -21,6 +21,7 @@ import org.apache.hugegraph.pd.core.store.HgKVStoreImplTest; import org.apache.hugegraph.pd.raft.IpAuthHandlerTest; import org.apache.hugegraph.pd.raft.RaftEngineIpAuthIntegrationTest; +import org.apache.hugegraph.pd.raft.RaftEngineLeaderAddressTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -40,6 +41,7 @@ TaskScheduleServiceTest.class, IpAuthHandlerTest.class, RaftEngineIpAuthIntegrationTest.class, + RaftEngineLeaderAddressTest.class, // StoreNodeServiceTest.class, }) @Slf4j diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java new file mode 100644 index 0000000000..0011eba203 --- /dev/null +++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.raft; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hugegraph.pd.config.PDConfig; +import org.apache.hugegraph.testutil.Whitebox; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.util.Endpoint; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RaftEngineLeaderAddressTest { + + private static final String LEADER_IP = "10.0.0.1"; + private static final int GRPC_PORT = 8686; + private static final String LEADER_GRPC_ADDRESS = "10.0.0.1:8686"; + + private Node originalRaftNode; + private RaftRpcClient originalRaftRpcClient; + private PDConfig.Raft originalConfig; + + private Node mockNode; + private RaftRpcClient mockRpcClient; + private PDConfig.Raft mockConfig; + private PeerId mockLeader; + + @Before + public void setUp() { + RaftEngine engine = RaftEngine.getInstance(); + + // Save originals + originalRaftNode = engine.getRaftNode(); + originalRaftRpcClient = Whitebox.getInternalState(engine, "raftRpcClient"); + originalConfig = Whitebox.getInternalState(engine, "config"); + + // Build mock leader PeerId with real Endpoint + mockLeader = mock(PeerId.class); + Endpoint endpoint = new Endpoint(LEADER_IP, 8610); + when(mockLeader.getEndpoint()).thenReturn(endpoint); + + // Build mock Node that reports itself as follower with a known leader + mockNode = mock(Node.class); + when(mockNode.isLeader(true)).thenReturn(false); + when(mockNode.getLeaderId()).thenReturn(mockLeader); + + // Build mock config + // Use a short timeout (100ms) so the null-leader test doesn't block for seconds + mockConfig = mock(PDConfig.Raft.class); + when(mockConfig.getGrpcAddress()).thenReturn("127.0.0.1:" + GRPC_PORT); + when(mockConfig.getGrpcPort()).thenReturn(GRPC_PORT); + when(mockConfig.getRpcTimeout()).thenReturn(100); + + // Build mock RpcClient + mockRpcClient = mock(RaftRpcClient.class); + + // Inject mocks + Whitebox.setInternalState(engine, "raftNode", mockNode); + Whitebox.setInternalState(engine, "raftRpcClient", mockRpcClient); + Whitebox.setInternalState(engine, "config", mockConfig); + } + + @After + public void tearDown() { + RaftEngine engine = RaftEngine.getInstance(); + Whitebox.setInternalState(engine, "raftNode", originalRaftNode); + Whitebox.setInternalState(engine, "raftRpcClient", originalRaftRpcClient); + Whitebox.setInternalState(engine, "config", originalConfig); + } + + @Test + public void testSuccessReturnsGrpcAddress() throws Exception { + // RPC succeeds and returns a valid gRPC address + RaftRpcProcessor.GetMemberResponse response = + mock(RaftRpcProcessor.GetMemberResponse.class); + when(response.getGrpcAddress()).thenReturn(LEADER_GRPC_ADDRESS); + + CompletableFuture future = + CompletableFuture.completedFuture(response); + when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future); + + String result = RaftEngine.getInstance().getLeaderGrpcAddress(); + Assert.assertEquals(LEADER_GRPC_ADDRESS, result); + } + + @Test + public void testTimeoutFallsBackToDerivedAddress() throws Exception { + // RPC times out — should fall back to leaderIp:grpcPort + CompletableFuture future = + mock(CompletableFuture.class); + when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS))) + .thenThrow(new TimeoutException("simulated timeout")); + when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future); + + String result = RaftEngine.getInstance().getLeaderGrpcAddress(); + Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result); + } + + @Test + public void testRpcExceptionFallsBackToDerivedAddress() throws Exception { + // RPC throws ExecutionException — should fall back to leaderIp:grpcPort + CompletableFuture future = + mock(CompletableFuture.class); + when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS))) + .thenThrow(new ExecutionException("simulated rpc failure", + new RuntimeException("bolt error"))); + when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future); + + String result = RaftEngine.getInstance().getLeaderGrpcAddress(); + Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result); + } + + @Test + public void testNullResponseFallsBackToDerivedAddress() throws Exception { + // RPC returns null response — should fall back to leaderIp:grpcPort + CompletableFuture future = + CompletableFuture.completedFuture(null); + when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future); + + String result = RaftEngine.getInstance().getLeaderGrpcAddress(); + Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result); + } + + @Test + public void testNullGrpcAddressInResponseFallsBackToDerivedAddress() throws Exception { + // RPC returns a response but grpcAddress field is null — should fall back + RaftRpcProcessor.GetMemberResponse response = + mock(RaftRpcProcessor.GetMemberResponse.class); + when(response.getGrpcAddress()).thenReturn(null); + + CompletableFuture future = + CompletableFuture.completedFuture(response); + when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future); + + String result = RaftEngine.getInstance().getLeaderGrpcAddress(); + Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result); + } + + @Test + public void testNullLeaderAfterWaitThrowsExecutionException() throws Exception { + // Leader is still null after waitingForLeader() — should throw ExecutionException + when(mockNode.getLeaderId()).thenReturn(null); + + try { + RaftEngine.getInstance().getLeaderGrpcAddress(); + Assert.fail("Expected ExecutionException"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof IllegalStateException); + Assert.assertEquals("Leader is not ready", e.getCause().getMessage()); + } + } +}