diff --git a/base-kv/base-kv-raft-type/src/main/proto/basekv/raft/RaftMessage.proto b/base-kv/base-kv-raft-type/src/main/proto/basekv/raft/RaftMessage.proto index d17e3bbd8..5cbae84f8 100644 --- a/base-kv/base-kv-raft-type/src/main/proto/basekv/raft/RaftMessage.proto +++ b/base-kv/base-kv-raft-type/src/main/proto/basekv/raft/RaftMessage.proto @@ -118,6 +118,29 @@ message ProposeReply { message TimeoutNow { } +message RequestChangeClusterConfig { + uint32 id = 1; + string correlateId = 2; + repeated string voters = 3; + repeated string learners = 4; +} + +message RequestChangeClusterConfigReply { + enum Code { + Success = 0; + ConcurrentChange = 1; + EmptyVoters = 2; + LearnersOverlap = 3; + SlowLearner = 4; + LeaderStepDown = 5; + NoLeader = 6; + ForwardTimeout = 7; + Cancelled = 8; + } + uint32 id = 1; + Code code = 2; +} + message RaftMessage { uint64 term = 1; // sender's term oneof MessageType { @@ -134,5 +157,7 @@ message RaftMessage { Propose propose = 12; ProposeReply proposeReply = 13; TimeoutNow timeoutNow = 14; + RequestChangeClusterConfig requestChangeClusterConfig = 15; + RequestChangeClusterConfigReply requestChangeClusterConfigReply = 16; }; } diff --git a/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftConfig.java b/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftConfig.java index 0576a82b3..3d535ba5c 100644 --- a/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftConfig.java +++ b/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftConfig.java @@ -59,6 +59,8 @@ public final class RaftConfig { private int readOnlyBatch = 10; @Builder.Default private boolean disableForwardProposal = false; + @Builder.Default + private boolean disableForwardClusterConfigChange = false; // if append log entries asynchronously which is an optimization described in $10.2.1 section of raft thesis @Builder.Default private boolean asyncAppend = true; diff --git a/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollower.java b/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollower.java index f2ea66d28..a71665af3 100644 --- a/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollower.java +++ b/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollower.java @@ -44,6 +44,8 @@ import org.apache.bifromq.basekv.raft.proto.ProposeReply; import org.apache.bifromq.basekv.raft.proto.RaftMessage; import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus; +import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfig; +import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfigReply; import org.apache.bifromq.basekv.raft.proto.RequestReadIndex; import org.apache.bifromq.basekv.raft.proto.RequestReadIndexReply; import org.apache.bifromq.basekv.raft.proto.RequestVote; @@ -56,6 +58,8 @@ class RaftNodeStateFollower extends RaftNodeState { private final Map> idToReadRequestMap; private final LinkedHashMap> tickToForwardedProposesMap; private final Map> idToForwardedProposeMap; + private final LinkedHashMap> tickToForwardedConfigChangesMap; + private final Map> idToForwardedConfigChangeMap; private int randomElectionTimeoutTick; private long currentTick; private int electionElapsedTick; @@ -113,6 +117,8 @@ class RaftNodeStateFollower extends RaftNodeState { idToReadRequestMap = new HashMap<>(); tickToForwardedProposesMap = new LinkedHashMap<>(); idToForwardedProposeMap = new HashMap<>(); + tickToForwardedConfigChangesMap = new LinkedHashMap<>(); + idToForwardedConfigChangeMap = new HashMap<>(); } @Override @@ -175,6 +181,22 @@ RaftNodeState tick() { break; } } + for (Iterator>> it = tickToForwardedConfigChangesMap.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry> entry = it.next(); + if (entry.getKey() + 2L * config.getHeartbeatTimeoutTick() < currentTick) { + it.remove(); + entry.getValue().forEach(pendingConfigChangeId -> { + CompletableFuture pendingOnDone = idToForwardedConfigChangeMap.remove(pendingConfigChangeId); + if (pendingOnDone != null && !pendingOnDone.isDone()) { + log.debug("Aborted forwarded timed-out ChangeClusterConfig request[{}]", pendingConfigChangeId); + pendingOnDone.completeExceptionally(ClusterConfigChangeException.forwardTimeout()); + } + }); + } else { + break; + } + } if (electionElapsedTick >= randomElectionTimeoutTick) { electionElapsedTick = 0; abortPendingReadIndexRequests(ReadIndexException.forwardTimeout()); @@ -359,6 +381,9 @@ RaftNodeState receive(String fromPeer, RaftMessage message) { case PROPOSEREPLY: handleProposeReply(message.getProposeReply()); break; + case REQUESTCHANGECLUSTERCONFIGREPLY: + handleRequestChangeClusterConfigReply(message.getRequestChangeClusterConfigReply()); + break; case TIMEOUTNOW: nextState = handleTimeoutNow(fromPeer); break; @@ -379,8 +404,41 @@ void changeClusterConfig(String correlateId, Set newVoters, Set newLearners, CompletableFuture onDone) { - // TODO: support leader forward - onDone.completeExceptionally(ClusterConfigChangeException.notLeader()); + // Check if forward is disabled + if (config.isDisableForwardClusterConfigChange()) { + log.debug("Forward cluster config change to leader is disabled"); + onDone.completeExceptionally(ClusterConfigChangeException.forwardDisabled()); + return; + } + + // Check if we know the leader + if (currentLeader == null) { + log.debug("Dropped cluster config change due to no leader elected in current term"); + onDone.completeExceptionally(ClusterConfigChangeException.noLeader()); + return; + } + + // Generate forward request ID and track + int forwardConfigChangeId = nextForwardReqId(); + tickToForwardedConfigChangesMap.compute(currentTick, (k, v) -> { + if (v == null) { + v = new HashSet<>(); + } + v.add(forwardConfigChangeId); + return v; + }); + idToForwardedConfigChangeMap.put(forwardConfigChangeId, onDone); + + // Send forward request to leader + submitRaftMessages(currentLeader, RaftMessage.newBuilder() + .setTerm(currentTerm()) + .setRequestChangeClusterConfig(RequestChangeClusterConfig.newBuilder() + .setId(forwardConfigChangeId) + .setCorrelateId(correlateId) + .addAllVoters(newVoters) + .addAllLearners(newLearners) + .build()) + .build()); } @Override @@ -462,6 +520,32 @@ public void stop() { super.stop(); abortPendingReadIndexRequests(ReadIndexException.cancelled()); abortPendingProposeRequests(DropProposalException.cancelled()); + abortPendingConfigChangeRequests(ClusterConfigChangeException.cancelled()); + } + + private void handleRequestChangeClusterConfigReply(RequestChangeClusterConfigReply reply) { + CompletableFuture pendingOnDone = idToForwardedConfigChangeMap.get(reply.getId()); + if (pendingOnDone != null) { + switch (reply.getCode()) { + case Success -> pendingOnDone.complete(null); + case ConcurrentChange -> pendingOnDone.completeExceptionally( + ClusterConfigChangeException.concurrentChange()); + case EmptyVoters -> pendingOnDone.completeExceptionally( + ClusterConfigChangeException.emptyVoters()); + case LearnersOverlap -> pendingOnDone.completeExceptionally( + ClusterConfigChangeException.learnersOverlap()); + case SlowLearner -> pendingOnDone.completeExceptionally( + ClusterConfigChangeException.slowLearner()); + case LeaderStepDown -> pendingOnDone.completeExceptionally( + ClusterConfigChangeException.leaderStepDown()); + case NoLeader -> pendingOnDone.completeExceptionally( + ClusterConfigChangeException.noLeader()); + case ForwardTimeout -> pendingOnDone.completeExceptionally( + ClusterConfigChangeException.forwardTimeout()); + default -> pendingOnDone.completeExceptionally( + ClusterConfigChangeException.cancelled()); + } + } } private void handleAppendEntries(String fromLeader, AppendEntries appendEntries) { @@ -739,6 +823,20 @@ private void abortPendingProposeRequests(DropProposalException e) { } } + private void abortPendingConfigChangeRequests(ClusterConfigChangeException e) { + for (Iterator>> it = tickToForwardedConfigChangesMap.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry> entry = it.next(); + it.remove(); + entry.getValue().forEach(pendingConfigChangeId -> { + CompletableFuture pendingOnDone = idToForwardedConfigChangeMap.remove(pendingConfigChangeId); + if (pendingOnDone != null && !pendingOnDone.isDone()) { + pendingOnDone.completeExceptionally(e); + } + }); + } + } + private boolean entryMatch(long index, long term) { Optional entry = stateStorage.entryAt(index); if (entry.isPresent()) { diff --git a/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateLeader.java b/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateLeader.java index fa0c3209a..cace11d7a 100644 --- a/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateLeader.java +++ b/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/RaftNodeStateLeader.java @@ -55,6 +55,8 @@ import org.apache.bifromq.basekv.raft.proto.RaftMessage; import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus; import org.apache.bifromq.basekv.raft.proto.RaftNodeSyncState; +import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfig; +import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfigReply; import org.apache.bifromq.basekv.raft.proto.RequestReadIndex; import org.apache.bifromq.basekv.raft.proto.RequestReadIndexReply; import org.apache.bifromq.basekv.raft.proto.Snapshot; @@ -369,6 +371,7 @@ RaftNodeState receive(String fromPeer, RaftMessage message) { } case REQUESTREADINDEX -> handleRequestReadIndex(fromPeer, message.getRequestReadIndex()); case PROPOSE -> handlePropose(fromPeer, message.getPropose()); + case REQUESTCHANGECLUSTERCONFIG -> handleRequestChangeClusterConfig(fromPeer, message.getRequestChangeClusterConfig()); case REQUESTPREVOTE -> sendRequestPreVoteReply(fromPeer, currentTerm(), false); default -> { // ignore other messages @@ -816,6 +819,50 @@ private void handlePropose(String fromPeer, Propose propose) { propose(propose.getCommand(), onDone); } + private void handleRequestChangeClusterConfig(String fromPeer, RequestChangeClusterConfig request) { + log.trace("Received forwarded ChangeClusterConfig request from peer[{}]", fromPeer); + CompletableFuture onDone = new CompletableFuture<>(); + onDone.whenComplete((v, e) -> { + RequestChangeClusterConfigReply.Builder replyBuilder = + RequestChangeClusterConfigReply.newBuilder().setId(request.getId()); + + if (e != null) { + log.debug("Failed to finish forwarded ChangeClusterConfig request from peer[{}]", fromPeer, e); + assert e instanceof ClusterConfigChangeException; + + // Map exception type to Reply Code + if (e instanceof ClusterConfigChangeException.ConcurrentChangeException) { + replyBuilder.setCode(RequestChangeClusterConfigReply.Code.ConcurrentChange); + } else if (e instanceof ClusterConfigChangeException.EmptyVotersException) { + replyBuilder.setCode(RequestChangeClusterConfigReply.Code.EmptyVoters); + } else if (e instanceof ClusterConfigChangeException.LearnersOverlapException) { + replyBuilder.setCode(RequestChangeClusterConfigReply.Code.LearnersOverlap); + } else if (e instanceof ClusterConfigChangeException.SlowLearnerException) { + replyBuilder.setCode(RequestChangeClusterConfigReply.Code.SlowLearner); + } else if (e instanceof ClusterConfigChangeException.LeaderStepDownException) { + replyBuilder.setCode(RequestChangeClusterConfigReply.Code.LeaderStepDown); + } else if (e instanceof ClusterConfigChangeException.NoLeaderException) { + replyBuilder.setCode(RequestChangeClusterConfigReply.Code.NoLeader); + } else { + replyBuilder.setCode(RequestChangeClusterConfigReply.Code.Cancelled); + } + } else { + replyBuilder.setCode(RequestChangeClusterConfigReply.Code.Success); + } + + submitRaftMessages(fromPeer, RaftMessage.newBuilder() + .setTerm(currentTerm()) + .setRequestChangeClusterConfigReply(replyBuilder.build()) + .build()); + }); + + // Call actual cluster config change logic + changeClusterConfig(request.getCorrelateId(), + new HashSet<>(request.getVotersList()), + new HashSet<>(request.getLearnersList()), + onDone); + } + private void sendTimeoutNow(String toPeer) { submitRaftMessages(toPeer, RaftMessage.newBuilder().setTerm(currentTerm()).setTimeoutNow(TimeoutNow.getDefaultInstance()).build()); diff --git a/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/exception/ClusterConfigChangeException.java b/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/exception/ClusterConfigChangeException.java index 728fa8a30..77468b72d 100644 --- a/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/exception/ClusterConfigChangeException.java +++ b/base-kv/base-kv-raft/src/main/java/org/apache/bifromq/basekv/raft/exception/ClusterConfigChangeException.java @@ -56,6 +56,14 @@ public static ClusterConfigChangeException cancelled() { return new CancelledException(); } + public static ClusterConfigChangeException forwardTimeout() { + return new ForwardTimeoutException(); + } + + public static ClusterConfigChangeException forwardDisabled() { + return new ForwardDisabledException(); + } + public static class ConcurrentChangeException extends ClusterConfigChangeException { private ConcurrentChangeException() { super("Only one on-going change is allowed"); @@ -104,4 +112,16 @@ private CancelledException() { super("Cancelled"); } } + + public static class ForwardTimeoutException extends ClusterConfigChangeException { + private ForwardTimeoutException() { + super("Forward request timeout"); + } + } + + public static class ForwardDisabledException extends ClusterConfigChangeException { + private ForwardDisabledException() { + super("Forward cluster config change is disabled"); + } + } } diff --git a/base-kv/base-kv-raft/src/test/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollowerTest.java b/base-kv/base-kv-raft/src/test/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollowerTest.java index 3c0077272..9ab38240f 100644 --- a/base-kv/base-kv-raft/src/test/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollowerTest.java +++ b/base-kv/base-kv-raft/src/test/java/org/apache/bifromq/basekv/raft/RaftNodeStateFollowerTest.java @@ -55,6 +55,8 @@ import org.apache.bifromq.basekv.raft.proto.ProposeReply; import org.apache.bifromq.basekv.raft.proto.RaftMessage; import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus; +import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfig; +import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfigReply; import org.apache.bifromq.basekv.raft.proto.RequestPreVote; import org.apache.bifromq.basekv.raft.proto.RequestPreVoteReply; import org.apache.bifromq.basekv.raft.proto.RequestReadIndex; @@ -264,6 +266,101 @@ public void testChangeClusterConfig() { assertTrue(onDone.isCompletedExceptionally()); } + @Test + public void testForwardChangeClusterConfig() { + AtomicInteger onMessageReadyIndex = new AtomicInteger(); + CompletableFuture onDone = new CompletableFuture<>(); + IRaftStateStore stateStorage = new InMemoryStateStore("testLocal", Snapshot.newBuilder() + .setClusterConfig(clusterConfig).build()); + + RaftNodeStateFollower follower = new RaftNodeStateFollower(1, 0, leader, defaultRaftConfig, + stateStorage, messages -> { + if (onMessageReadyIndex.get() == 0) { + onMessageReadyIndex.incrementAndGet(); + assertEquals(messages, new HashMap>() {{ + put(leader, Collections.singletonList(RaftMessage.newBuilder() + .setTerm(1) + .setRequestChangeClusterConfig(RequestChangeClusterConfig.newBuilder() + .setId(1) + .setCorrelateId("cId") + .addVoters("v3") + .addLearners("l4") + .build()) + .build())); + }}); + } + }, eventListener, snapshotInstaller, onSnapshotInstalled); + + follower.changeClusterConfig("cId", Collections.singleton("v3"), Collections.singleton("l4"), onDone); + assertFalse(onDone.isDone()); + + Map> idToForwardedConfigChangeMap = + ReflectionUtils.getField(follower, "idToForwardedConfigChangeMap"); + assertTrue(Objects.requireNonNull(idToForwardedConfigChangeMap).containsKey(1)); + + RaftMessage configChangeReply = RaftMessage.newBuilder() + .setTerm(1) + .setRequestChangeClusterConfigReply(RequestChangeClusterConfigReply.newBuilder() + .setId(1) + .setCode(RequestChangeClusterConfigReply.Code.Success) + .build()) + .build(); + follower.receive(leader, configChangeReply); + assertTrue(onDone.isDone()); + assertFalse(onDone.isCompletedExceptionally()); + } + + @Test + public void testForwardChangeClusterConfigDisabled() { + RaftConfig config = new RaftConfig() + .setDisableForwardClusterConfigChange(true); + IRaftStateStore stateStorage = new InMemoryStateStore("testLocal", Snapshot.newBuilder() + .setClusterConfig(clusterConfig).build()); + + RaftNodeStateFollower follower = new RaftNodeStateFollower(1, 0, leader, config, + stateStorage, msgSender, eventListener, snapshotInstaller, onSnapshotInstalled); + + CompletableFuture onDone = new CompletableFuture<>(); + follower.changeClusterConfig("cId", Collections.singleton("v3"), Collections.singleton("l4"), onDone); + assertTrue(onDone.isCompletedExceptionally()); + } + + @Test + public void testForwardChangeClusterConfigNoLeader() { + IRaftStateStore stateStorage = new InMemoryStateStore("testLocal", Snapshot.newBuilder() + .setClusterConfig(clusterConfig).build()); + + RaftNodeStateFollower follower = new RaftNodeStateFollower(1, 0, null, defaultRaftConfig, + stateStorage, msgSender, eventListener, snapshotInstaller, onSnapshotInstalled); + + CompletableFuture onDone = new CompletableFuture<>(); + follower.changeClusterConfig("cId", Collections.singleton("v3"), Collections.singleton("l4"), onDone); + assertTrue(onDone.isCompletedExceptionally()); + } + + @Test + public void testForwardChangeClusterConfigException() { + IRaftStateStore stateStorage = new InMemoryStateStore("testLocal", Snapshot.newBuilder() + .setClusterConfig(clusterConfig).build()); + + RaftNodeStateFollower follower = new RaftNodeStateFollower(1, 0, leader, defaultRaftConfig, + stateStorage, msgSender, eventListener, snapshotInstaller, onSnapshotInstalled); + + CompletableFuture onDone = new CompletableFuture<>(); + follower.changeClusterConfig("cId", Collections.singleton("v3"), Collections.singleton("l4"), onDone); + assertFalse(onDone.isDone()); + + RaftMessage configChangeReply = RaftMessage.newBuilder() + .setTerm(1) + .setRequestChangeClusterConfigReply(RequestChangeClusterConfigReply.newBuilder() + .setId(1) + .setCode(RequestChangeClusterConfigReply.Code.ConcurrentChange) + .build()) + .build(); + follower.receive(leader, configChangeReply); + assertTrue(onDone.isCompletedExceptionally()); + } + @Test public void testReadIndex() { AtomicInteger onMessageReadyIndex = new AtomicInteger();