Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,29 @@ message ProposeReply {
message TimeoutNow {
}

message RequestChangeClusterConfig {
uint32 id = 1;
string correlateId = 2;
repeated string voters = 3;
repeated string learners = 4;
}

message RequestChangeClusterConfigReply {
enum Code {
Success = 0;
ConcurrentChange = 1;
EmptyVoters = 2;
LearnersOverlap = 3;
SlowLearner = 4;
LeaderStepDown = 5;
NoLeader = 6;
ForwardTimeout = 7;
Cancelled = 8;
}
uint32 id = 1;
Code code = 2;
}

message RaftMessage {
uint64 term = 1; // sender's term
oneof MessageType {
Expand All @@ -134,5 +157,7 @@ message RaftMessage {
Propose propose = 12;
ProposeReply proposeReply = 13;
TimeoutNow timeoutNow = 14;
RequestChangeClusterConfig requestChangeClusterConfig = 15;
RequestChangeClusterConfigReply requestChangeClusterConfigReply = 16;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public final class RaftConfig {
private int readOnlyBatch = 10;
@Builder.Default
private boolean disableForwardProposal = false;
@Builder.Default
private boolean disableForwardClusterConfigChange = false;
// if append log entries asynchronously which is an optimization described in $10.2.1 section of raft thesis
@Builder.Default
private boolean asyncAppend = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.bifromq.basekv.raft.proto.ProposeReply;
import org.apache.bifromq.basekv.raft.proto.RaftMessage;
import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus;
import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfig;
import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfigReply;
import org.apache.bifromq.basekv.raft.proto.RequestReadIndex;
import org.apache.bifromq.basekv.raft.proto.RequestReadIndexReply;
import org.apache.bifromq.basekv.raft.proto.RequestVote;
Expand All @@ -56,6 +58,8 @@ class RaftNodeStateFollower extends RaftNodeState {
private final Map<Integer, CompletableFuture<Long>> idToReadRequestMap;
private final LinkedHashMap<Long, Set<Integer>> tickToForwardedProposesMap;
private final Map<Integer, CompletableFuture<Long>> idToForwardedProposeMap;
private final LinkedHashMap<Long, Set<Integer>> tickToForwardedConfigChangesMap;
private final Map<Integer, CompletableFuture<Void>> idToForwardedConfigChangeMap;
private int randomElectionTimeoutTick;
private long currentTick;
private int electionElapsedTick;
Expand Down Expand Up @@ -113,6 +117,8 @@ class RaftNodeStateFollower extends RaftNodeState {
idToReadRequestMap = new HashMap<>();
tickToForwardedProposesMap = new LinkedHashMap<>();
idToForwardedProposeMap = new HashMap<>();
tickToForwardedConfigChangesMap = new LinkedHashMap<>();
idToForwardedConfigChangeMap = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -175,6 +181,22 @@ RaftNodeState tick() {
break;
}
}
for (Iterator<Map.Entry<Long, Set<Integer>>> it = tickToForwardedConfigChangesMap.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<Long, Set<Integer>> entry = it.next();
if (entry.getKey() + 2L * config.getHeartbeatTimeoutTick() < currentTick) {
it.remove();
entry.getValue().forEach(pendingConfigChangeId -> {
CompletableFuture<Void> pendingOnDone = idToForwardedConfigChangeMap.remove(pendingConfigChangeId);
if (pendingOnDone != null && !pendingOnDone.isDone()) {
log.debug("Aborted forwarded timed-out ChangeClusterConfig request[{}]", pendingConfigChangeId);
pendingOnDone.completeExceptionally(ClusterConfigChangeException.forwardTimeout());
}
});
} else {
break;
}
}
if (electionElapsedTick >= randomElectionTimeoutTick) {
electionElapsedTick = 0;
abortPendingReadIndexRequests(ReadIndexException.forwardTimeout());
Expand Down Expand Up @@ -359,6 +381,9 @@ RaftNodeState receive(String fromPeer, RaftMessage message) {
case PROPOSEREPLY:
handleProposeReply(message.getProposeReply());
break;
case REQUESTCHANGECLUSTERCONFIGREPLY:
handleRequestChangeClusterConfigReply(message.getRequestChangeClusterConfigReply());
break;
case TIMEOUTNOW:
nextState = handleTimeoutNow(fromPeer);
break;
Expand All @@ -379,8 +404,41 @@ void changeClusterConfig(String correlateId,
Set<String> newVoters,
Set<String> newLearners,
CompletableFuture<Void> onDone) {
// TODO: support leader forward
onDone.completeExceptionally(ClusterConfigChangeException.notLeader());
// Check if forward is disabled
if (config.isDisableForwardClusterConfigChange()) {
log.debug("Forward cluster config change to leader is disabled");
onDone.completeExceptionally(ClusterConfigChangeException.forwardDisabled());
return;
}

// Check if we know the leader
if (currentLeader == null) {
log.debug("Dropped cluster config change due to no leader elected in current term");
onDone.completeExceptionally(ClusterConfigChangeException.noLeader());
return;
}

// Generate forward request ID and track
int forwardConfigChangeId = nextForwardReqId();
tickToForwardedConfigChangesMap.compute(currentTick, (k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(forwardConfigChangeId);
return v;
});
idToForwardedConfigChangeMap.put(forwardConfigChangeId, onDone);

// Send forward request to leader
submitRaftMessages(currentLeader, RaftMessage.newBuilder()
.setTerm(currentTerm())
.setRequestChangeClusterConfig(RequestChangeClusterConfig.newBuilder()
.setId(forwardConfigChangeId)
.setCorrelateId(correlateId)
.addAllVoters(newVoters)
.addAllLearners(newLearners)
.build())
.build());
}

@Override
Expand Down Expand Up @@ -462,6 +520,32 @@ public void stop() {
super.stop();
abortPendingReadIndexRequests(ReadIndexException.cancelled());
abortPendingProposeRequests(DropProposalException.cancelled());
abortPendingConfigChangeRequests(ClusterConfigChangeException.cancelled());
}

private void handleRequestChangeClusterConfigReply(RequestChangeClusterConfigReply reply) {
CompletableFuture<Void> pendingOnDone = idToForwardedConfigChangeMap.get(reply.getId());
if (pendingOnDone != null) {
switch (reply.getCode()) {
case Success -> pendingOnDone.complete(null);
case ConcurrentChange -> pendingOnDone.completeExceptionally(
ClusterConfigChangeException.concurrentChange());
case EmptyVoters -> pendingOnDone.completeExceptionally(
ClusterConfigChangeException.emptyVoters());
case LearnersOverlap -> pendingOnDone.completeExceptionally(
ClusterConfigChangeException.learnersOverlap());
case SlowLearner -> pendingOnDone.completeExceptionally(
ClusterConfigChangeException.slowLearner());
case LeaderStepDown -> pendingOnDone.completeExceptionally(
ClusterConfigChangeException.leaderStepDown());
case NoLeader -> pendingOnDone.completeExceptionally(
ClusterConfigChangeException.noLeader());
case ForwardTimeout -> pendingOnDone.completeExceptionally(
ClusterConfigChangeException.forwardTimeout());
default -> pendingOnDone.completeExceptionally(
ClusterConfigChangeException.cancelled());
}
}
}

private void handleAppendEntries(String fromLeader, AppendEntries appendEntries) {
Expand Down Expand Up @@ -739,6 +823,20 @@ private void abortPendingProposeRequests(DropProposalException e) {
}
}

private void abortPendingConfigChangeRequests(ClusterConfigChangeException e) {
for (Iterator<Map.Entry<Long, Set<Integer>>> it = tickToForwardedConfigChangesMap.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<Long, Set<Integer>> entry = it.next();
it.remove();
entry.getValue().forEach(pendingConfigChangeId -> {
CompletableFuture<Void> pendingOnDone = idToForwardedConfigChangeMap.remove(pendingConfigChangeId);
if (pendingOnDone != null && !pendingOnDone.isDone()) {
pendingOnDone.completeExceptionally(e);
}
});
}
}

private boolean entryMatch(long index, long term) {
Optional<LogEntry> entry = stateStorage.entryAt(index);
if (entry.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.bifromq.basekv.raft.proto.RaftMessage;
import org.apache.bifromq.basekv.raft.proto.RaftNodeStatus;
import org.apache.bifromq.basekv.raft.proto.RaftNodeSyncState;
import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfig;
import org.apache.bifromq.basekv.raft.proto.RequestChangeClusterConfigReply;
import org.apache.bifromq.basekv.raft.proto.RequestReadIndex;
import org.apache.bifromq.basekv.raft.proto.RequestReadIndexReply;
import org.apache.bifromq.basekv.raft.proto.Snapshot;
Expand Down Expand Up @@ -369,6 +371,7 @@ RaftNodeState receive(String fromPeer, RaftMessage message) {
}
case REQUESTREADINDEX -> handleRequestReadIndex(fromPeer, message.getRequestReadIndex());
case PROPOSE -> handlePropose(fromPeer, message.getPropose());
case REQUESTCHANGECLUSTERCONFIG -> handleRequestChangeClusterConfig(fromPeer, message.getRequestChangeClusterConfig());
case REQUESTPREVOTE -> sendRequestPreVoteReply(fromPeer, currentTerm(), false);
default -> {
// ignore other messages
Expand Down Expand Up @@ -816,6 +819,50 @@ private void handlePropose(String fromPeer, Propose propose) {
propose(propose.getCommand(), onDone);
}

private void handleRequestChangeClusterConfig(String fromPeer, RequestChangeClusterConfig request) {
log.trace("Received forwarded ChangeClusterConfig request from peer[{}]", fromPeer);
CompletableFuture<Void> onDone = new CompletableFuture<>();
onDone.whenComplete((v, e) -> {
RequestChangeClusterConfigReply.Builder replyBuilder =
RequestChangeClusterConfigReply.newBuilder().setId(request.getId());

if (e != null) {
log.debug("Failed to finish forwarded ChangeClusterConfig request from peer[{}]", fromPeer, e);
assert e instanceof ClusterConfigChangeException;

// Map exception type to Reply Code
if (e instanceof ClusterConfigChangeException.ConcurrentChangeException) {
replyBuilder.setCode(RequestChangeClusterConfigReply.Code.ConcurrentChange);
} else if (e instanceof ClusterConfigChangeException.EmptyVotersException) {
replyBuilder.setCode(RequestChangeClusterConfigReply.Code.EmptyVoters);
} else if (e instanceof ClusterConfigChangeException.LearnersOverlapException) {
replyBuilder.setCode(RequestChangeClusterConfigReply.Code.LearnersOverlap);
} else if (e instanceof ClusterConfigChangeException.SlowLearnerException) {
replyBuilder.setCode(RequestChangeClusterConfigReply.Code.SlowLearner);
} else if (e instanceof ClusterConfigChangeException.LeaderStepDownException) {
replyBuilder.setCode(RequestChangeClusterConfigReply.Code.LeaderStepDown);
} else if (e instanceof ClusterConfigChangeException.NoLeaderException) {
replyBuilder.setCode(RequestChangeClusterConfigReply.Code.NoLeader);
} else {
replyBuilder.setCode(RequestChangeClusterConfigReply.Code.Cancelled);
}
} else {
replyBuilder.setCode(RequestChangeClusterConfigReply.Code.Success);
}

submitRaftMessages(fromPeer, RaftMessage.newBuilder()
.setTerm(currentTerm())
.setRequestChangeClusterConfigReply(replyBuilder.build())
.build());
});

// Call actual cluster config change logic
changeClusterConfig(request.getCorrelateId(),
new HashSet<>(request.getVotersList()),
new HashSet<>(request.getLearnersList()),
onDone);
}

private void sendTimeoutNow(String toPeer) {
submitRaftMessages(toPeer,
RaftMessage.newBuilder().setTerm(currentTerm()).setTimeoutNow(TimeoutNow.getDefaultInstance()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public static ClusterConfigChangeException cancelled() {
return new CancelledException();
}

public static ClusterConfigChangeException forwardTimeout() {
return new ForwardTimeoutException();
}

public static ClusterConfigChangeException forwardDisabled() {
return new ForwardDisabledException();
}

public static class ConcurrentChangeException extends ClusterConfigChangeException {
private ConcurrentChangeException() {
super("Only one on-going change is allowed");
Expand Down Expand Up @@ -104,4 +112,16 @@ private CancelledException() {
super("Cancelled");
}
}

public static class ForwardTimeoutException extends ClusterConfigChangeException {
private ForwardTimeoutException() {
super("Forward request timeout");
}
}

public static class ForwardDisabledException extends ClusterConfigChangeException {
private ForwardDisabledException() {
super("Forward cluster config change is disabled");
}
}
}
Loading