-
Notifications
You must be signed in to change notification settings - Fork 581
fix(pd): add timeout and null-safety to getLeaderGrpcAddress() #2961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.stream.Collectors; | ||
|
|
||
|
|
@@ -49,7 +50,6 @@ | |
| import com.alipay.sofa.jraft.entity.Task; | ||
| import com.alipay.sofa.jraft.error.RaftError; | ||
| import com.alipay.sofa.jraft.option.NodeOptions; | ||
| import com.alipay.sofa.jraft.option.RaftOptions; | ||
| import com.alipay.sofa.jraft.option.RpcOptions; | ||
| import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; | ||
| import com.alipay.sofa.jraft.rpc.RpcServer; | ||
|
|
@@ -86,8 +86,12 @@ public synchronized boolean init(PDConfig.Raft config) { | |
| } | ||
| this.config = config; | ||
|
|
||
| // Wire configured rpc timeout into RaftRpcClient so the Bolt transport | ||
| // timeout and the future.get() caller timeout in getLeaderGrpcAddress() are consistent. | ||
| raftRpcClient = new RaftRpcClient(); | ||
| raftRpcClient.init(new RpcOptions()); | ||
| RpcOptions rpcOptions = new RpcOptions(); | ||
| rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout()); | ||
| raftRpcClient.init(rpcOptions); | ||
|
|
||
| String raftPath = config.getDataPath() + "/" + groupId; | ||
| new File(raftPath).mkdirs(); | ||
|
|
@@ -119,10 +123,7 @@ public synchronized boolean init(PDConfig.Raft config) { | |
| nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout()); | ||
| nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout()); | ||
| nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout()); | ||
| // Set the raft configuration | ||
| RaftOptions raftOptions = nodeOptions.getRaftOptions(); | ||
|
|
||
| nodeOptions.setEnableMetrics(true); | ||
| // TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine for reference) | ||
|
|
||
| final PeerId serverId = JRaftUtils.getPeerId(config.getAddress()); | ||
|
|
||
|
|
@@ -228,19 +229,49 @@ public PeerId getLeader() { | |
| } | ||
|
|
||
| /** | ||
| * Send a message to the leader to get the grpc address; | ||
| * Send a message to the leader to get the grpc address. | ||
| */ | ||
| public String getLeaderGrpcAddress() throws ExecutionException, InterruptedException { | ||
| if (isLeader()) { | ||
| return config.getGrpcAddress(); | ||
| } | ||
|
|
||
| if (raftNode.getLeaderId() == null) { | ||
| waitingForLeader(10000); | ||
| waitingForLeader(config.getRpcTimeout()); | ||
| } | ||
|
|
||
| return raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get() | ||
| .getGrpcAddress(); | ||
| // Cache leader to avoid repeated getLeaderId() calls and guard against | ||
| // waitingForLeader() returning without a leader being elected. | ||
| PeerId leader = raftNode.getLeaderId(); | ||
| if (leader == null) { | ||
| throw new ExecutionException(new IllegalStateException("Leader is not ready")); | ||
| } | ||
|
|
||
| try { | ||
| RaftRpcProcessor.GetMemberResponse response = raftRpcClient | ||
| .getGrpcAddress(leader.getEndpoint().toString()) | ||
| .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS); | ||
bitflicker64 marked this conversation as resolved.
Show resolved
Hide resolved
bitflicker64 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (response != null && response.getGrpcAddress() != null) { | ||
| return response.getGrpcAddress(); | ||
| } | ||
| } catch (TimeoutException e) { | ||
| // TODO: a more complete fix would need a source of truth for the leader's | ||
| // actual grpcAddress rather than deriving it from the local node's port config. | ||
| throw new ExecutionException( | ||
| String.format("Timed out while resolving leader gRPC address for %s", leader), | ||
| e); | ||
| } catch (ExecutionException e) { | ||
| // TODO: a more complete fix would need a source of truth for the leader's | ||
| // actual grpcAddress rather than deriving it from the local node's port config. | ||
| Throwable cause = e.getCause() != null ? e.getCause() : e; | ||
| throw new ExecutionException( | ||
| String.format("Failed to resolve leader gRPC address for %s", leader), cause); | ||
| } | ||
|
Comment on lines
+257
to
+269
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, the PR said this method should fall back to deriving the leader gRPC address from the Raft leader endpoint plus the local gRPC port when the Bolt RPC hangs / fails / returns null. However, in the current implementation, all of those paths still end by throwing
That means the follower still does not get a usable leader gRPC address in the exact scenario this PR is trying to fix. Contextually, the call chain still looks like this: |
||
|
|
||
| log.warn("Leader gRPC address field is null in RPC response for {}", leader); | ||
| throw new ExecutionException( | ||
| new IllegalStateException( | ||
| String.format("Leader gRPC address unavailable for %s", leader))); | ||
|
Comment on lines
+271
to
+274
|
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -322,14 +353,7 @@ public Status changePeerList(String peerList) { | |
| newPeers.parse(peerList); | ||
| CountDownLatch latch = new CountDownLatch(1); | ||
| this.raftNode.changePeers(newPeers, status -> { | ||
| // Use compareAndSet so a late callback does not overwrite a timeout status | ||
| result.compareAndSet(null, status); | ||
| // Refresh inside callback so it fires even if caller already timed out | ||
| // Note: changePeerList() uses Configuration.parse() which only supports | ||
| // plain comma-separated peer addresses with no learner syntax. | ||
| // getLearners() will always be empty here. Learner support is handled | ||
| // in PDService.updatePdRaft() which uses PeerUtil.parseConfig() | ||
| // and supports the /learner suffix. | ||
| if (status != null && status.isOk()) { | ||
| IpAuthHandler handler = IpAuthHandler.getInstance(); | ||
| if (handler != null) { | ||
|
|
@@ -347,16 +371,12 @@ public Status changePeerList(String peerList) { | |
| } | ||
| latch.countDown(); | ||
| }); | ||
| // Use 3x configured RPC timeout — bare await() would block forever if | ||
| // the callback is never invoked (e.g. node not started / RPC failure) | ||
| boolean completed = latch.await(3L * config.getRpcTimeout(), | ||
| TimeUnit.MILLISECONDS); | ||
| boolean completed = latch.await(3L * config.getRpcTimeout(), TimeUnit.MILLISECONDS); | ||
| if (!completed && result.get() == null) { | ||
| Status timeoutStatus = new Status(RaftError.EINTERNAL, | ||
| "changePeerList timed out after %d ms", | ||
| 3L * config.getRpcTimeout()); | ||
| if (!result.compareAndSet(null, timeoutStatus)) { | ||
| // Callback arrived just before us — keep its result | ||
| timeoutStatus = null; | ||
| } | ||
| if (timeoutStatus != null) { | ||
|
|
@@ -395,7 +415,6 @@ public PeerId waitingForLeader(long timeOut) { | |
| } | ||
| return leader; | ||
| } | ||
|
|
||
| } | ||
|
|
||
| public Node getRaftNode() { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getLeaderGrpcAddress()failure paths here. The current PD test additions exercisechangePeerList()/IpAuthHandler, but not the timeout, RPC-exception, or null-response branches introduced in this method.Could add a small UIT with a mocked
RaftRpcClientso regressions in the leader-resolution path are caught by CI.