diff --git a/raft-client/src/main/java/org/apache/raft/client/RaftClient.java b/raft-client/src/main/java/org/apache/raft/client/RaftClient.java index a5e1fce8ba..ec4c34171f 100644 --- a/raft-client/src/main/java/org/apache/raft/client/RaftClient.java +++ b/raft-client/src/main/java/org/apache/raft/client/RaftClient.java @@ -58,12 +58,14 @@ static String nextLeader(final String leaderId, final Iterator i) { return first; } - private void refreshPeers(RaftPeer[] newPeers) { + private void refreshPeers(RaftPeer[] newPeers) throws IOException { if (newPeers != null && newPeers.length > 0) { peers.clear(); for (RaftPeer p : newPeers) { peers.put(p.getId(), p); } + // also refresh the rpc proxies for these peers + client2serverRpc.addServerProxies(Arrays.asList(newPeers)); } } @@ -95,8 +97,8 @@ public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf) } } - private RaftClientReply sendRequest(RaftClientRequest r, - final String leader) throws InterruptedIOException { + private RaftClientReply sendRequest(RaftClientRequest r, final String leader) + throws IOException { try { return client2serverRpc.sendRequest(r); } catch (NotLeaderException nle) { @@ -118,7 +120,7 @@ private RaftClientReply sendRequest(RaftClientRequest r, } private void handleNotLeaderException(NotLeaderException e) - throws InterruptedIOException { + throws IOException { LOG.debug("{}: got NotLeaderException", clientId, e); refreshPeers(e.getPeers()); String newLeader = e.getSuggestedLeader() != null ? diff --git a/raft-client/src/main/java/org/apache/raft/client/RaftClientRequestSender.java b/raft-client/src/main/java/org/apache/raft/client/RaftClientRequestSender.java index d5e2cd2e62..4cc2847153 100644 --- a/raft-client/src/main/java/org/apache/raft/client/RaftClientRequestSender.java +++ b/raft-client/src/main/java/org/apache/raft/client/RaftClientRequestSender.java @@ -19,9 +19,13 @@ import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftClientRequest; +import org.apache.raft.protocol.RaftPeer; import java.io.IOException; public interface RaftClientRequestSender { RaftClientReply sendRequest(RaftClientRequest request) throws IOException; + + /** add rpc information of the given raft servers */ + void addServerProxies(Iterable servers) throws IOException; } diff --git a/raft-common/src/main/java/org/apache/raft/protocol/RaftPeer.java b/raft-common/src/main/java/org/apache/raft/protocol/RaftPeer.java index b05a063443..182f35d7a8 100644 --- a/raft-common/src/main/java/org/apache/raft/protocol/RaftPeer.java +++ b/raft-common/src/main/java/org/apache/raft/protocol/RaftPeer.java @@ -24,7 +24,8 @@ public class RaftPeer { private final String address; public RaftPeer(String id) { - this(id, id); + this.id = id; + this.address = null; } public RaftPeer(String id, InetSocketAddress address) { diff --git a/raft-common/src/main/java/org/apache/raft/protocol/SetConfigurationRequest.java b/raft-common/src/main/java/org/apache/raft/protocol/SetConfigurationRequest.java index 467173cc66..75769b62f2 100644 --- a/raft-common/src/main/java/org/apache/raft/protocol/SetConfigurationRequest.java +++ b/raft-common/src/main/java/org/apache/raft/protocol/SetConfigurationRequest.java @@ -20,18 +20,16 @@ import java.util.Arrays; public class SetConfigurationRequest extends RaftClientRequest { - public SetConfigurationRequest(String requestorId, String replierId, - RaftPeer[] members) { - this(requestorId, replierId, new ConfigurationMessage(members)); - } + private final RaftPeer[] peers; public SetConfigurationRequest(String requestorId, String replierId, - ConfigurationMessage members) { - super(requestorId, replierId, members); + RaftPeer[] peers) { + super(requestorId, replierId, null); + this.peers = peers; } public RaftPeer[] getPeersInNewConf() { - return ((ConfigurationMessage) getMessage()).getMembers(); + return peers; } @Override diff --git a/raft-common/src/main/java/org/apache/raft/util/ProtoUtils.java b/raft-common/src/main/java/org/apache/raft/util/ProtoUtils.java index 9b67e00359..e25098cc1f 100644 --- a/raft-common/src/main/java/org/apache/raft/util/ProtoUtils.java +++ b/raft-common/src/main/java/org/apache/raft/util/ProtoUtils.java @@ -40,14 +40,16 @@ public static ByteString toByteString(byte[] bytes) { } public static RaftPeerProto toRaftPeerProto(RaftPeer peer) { - return RaftPeerProto.newBuilder() - .setId(peer.getId()) - .setAddress(peer.getAddress()) - .build(); + RaftPeerProto.Builder builder = RaftPeerProto.newBuilder() + .setId(peer.getId()); + if (peer.getAddress() != null) { + builder.setAddress(peer.getAddress()); + } + return builder.build(); } public static RaftPeer toRaftPeer(RaftPeerProto p) { - return new RaftPeer(p.getId(), p.getAddress()); + return new RaftPeer(p.getId(), p.hasAddress() ? p.getAddress() : null); } public static RaftPeer[] toRaftPeerArray(List protos) { @@ -75,18 +77,6 @@ public RaftPeerProto next() { }; } - public static ConfigurationMessageProto toConfigurationMessageProto( - ConfigurationMessage m) { - return ConfigurationMessageProto.newBuilder() - .addAllPeers(toRaftPeerProtos(Arrays.asList(m.getMembers()))) - .build(); - } - - public static ConfigurationMessage toConfigurationMessage( - ConfigurationMessageProto p) { - return new ConfigurationMessage(toRaftPeerArray(p.getPeersList())); - } - public static boolean isConfigurationLogEntry(LogEntryProto entry) { return entry.getType() == LogEntryProto.Type.CONFIGURATION; } @@ -143,34 +133,33 @@ public static RaftClientRequestProto toRaftClientRequestProto( } public static RaftClientReplyProto toRaftClientReplyProto( - RaftClientRequestProto request, RaftClientReply reply) { + RaftRpcRequestProto request, RaftClientReply reply) { final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder(); if (reply != null) { - b.setRpcReply(toRaftRpcReplyProtoBuilder(request.getRpcRequest(), reply)); + b.setRpcReply(toRaftRpcReplyProtoBuilder(request, reply)); } return b.build(); } public static SetConfigurationRequest toSetConfigurationRequest( SetConfigurationRequestProto p) throws InvalidProtocolBufferException { - final RaftRpcMessageProto m = p.getClientRequest().getRpcRequest().getRpcMessage(); - final ConfigurationMessageProto conf = ConfigurationMessageProto.parseFrom( - p.getClientRequest().getMessage().getContent().toByteArray()); - return new SetConfigurationRequest(m.getRequestorId(), m.getReplyId(), - toRaftPeerArray(conf.getPeersList())); + final RaftRpcMessageProto m = p.getRpcRequest().getRpcMessage(); + final RaftPeer[] peers = toRaftPeerArray(p.getPeersList()); + return new SetConfigurationRequest(m.getRequestorId(), m.getReplyId(), peers); } public static SetConfigurationRequestProto toSetConfigurationRequestProto( SetConfigurationRequest request) { return SetConfigurationRequestProto.newBuilder() - .setClientRequest(toRaftClientRequestProto(request)) + .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) + .addAllPeers(toRaftPeerProtos(Arrays.asList(request.getPeersInNewConf()))) .build(); } public static SetConfigurationReplyProto toSetConfigurationReplyProto( SetConfigurationRequestProto request, RaftClientReply reply) { return SetConfigurationReplyProto.newBuilder() - .setClientReply(toRaftClientReplyProto(request.getClientRequest(), reply)) + .setClientReply(toRaftClientReplyProto(request.getRpcRequest(), reply)) .build(); } diff --git a/raft-common/src/main/proto/Raft.proto b/raft-common/src/main/proto/Raft.proto index cb13cbb057..41f6f2875a 100644 --- a/raft-common/src/main/proto/Raft.proto +++ b/raft-common/src/main/proto/Raft.proto @@ -22,12 +22,8 @@ option java_generate_equals_and_hash = true; package common.raft; message RaftPeerProto { - required string id = 1; // id of the peer - required string address = 2; // e.g. IP address, hostname etc. -} - -message ConfigurationMessageProto { - repeated RaftPeerProto peers = 1; // the peers in the new configuration + optional string id = 1; // id of the peer + optional string address = 2; // e.g. IP address, hostname etc. } message RaftConfigurationProto { @@ -54,20 +50,20 @@ message LogEntryProto { } message TermIndexProto { - required uint64 term = 1; - required uint64 index = 2; + optional uint64 term = 1; + optional uint64 index = 2; } message RaftRpcMessageProto { - required string requestorId = 1; - required string replyId = 2; + optional string requestorId = 1; + optional string replyId = 2; } message RaftRpcRequestProto { - required RaftRpcMessageProto rpcMessage = 1; + optional RaftRpcMessageProto rpcMessage = 1; } message RaftRpcReplyProto { - required RaftRpcMessageProto rpcMessage = 1; - required bool success = 2; + optional RaftRpcMessageProto rpcMessage = 1; + optional bool success = 2; } diff --git a/raft-common/src/main/proto/RaftClientProtocol.proto b/raft-common/src/main/proto/RaftClientProtocol.proto index 043e3e9262..937e3f4567 100644 --- a/raft-common/src/main/proto/RaftClientProtocol.proto +++ b/raft-common/src/main/proto/RaftClientProtocol.proto @@ -23,22 +23,24 @@ package common.raft; import "Raft.proto"; +// normal client request message RaftClientRequestProto { - required RaftRpcRequestProto rpcRequest = 1; - - required ClientMessageEntryProto message = 2; + optional RaftRpcRequestProto rpcRequest = 1; + optional ClientMessageEntryProto message = 2; } message RaftClientReplyProto { - required RaftRpcReplyProto rpcReply = 1; + optional RaftRpcReplyProto rpcReply = 1; } +// setConfiguration request message SetConfigurationRequestProto { - required RaftClientRequestProto clientRequest = 1; + optional RaftRpcRequestProto rpcRequest = 1; + repeated RaftPeerProto peers = 2; } message SetConfigurationReplyProto { - required RaftClientReplyProto clientReply = 1; + optional RaftClientReplyProto clientReply = 1; } service RaftClientProtocolService { diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/client/HadoopClientRequestSender.java b/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/client/HadoopClientRequestSender.java index 26ca1ecf04..c0512efecb 100644 --- a/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/client/HadoopClientRequestSender.java +++ b/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/client/HadoopClientRequestSender.java @@ -58,4 +58,9 @@ public RaftClientProtocolClientSideTranslatorPB createProxy(RaftPeer p) RaftClientProtocolPB.class, p.getAddress(), getConf()); return new RaftClientProtocolClientSideTranslatorPB(proxy); } + + @Override + public void addServerProxies(Iterable servers) throws IOException { + addPeers(servers); + } } diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/client/RaftClientProtocolServerSideTranslatorPB.java b/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/client/RaftClientProtocolServerSideTranslatorPB.java index f1f18af888..c516367f01 100644 --- a/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/client/RaftClientProtocolServerSideTranslatorPB.java +++ b/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/client/RaftClientProtocolServerSideTranslatorPB.java @@ -52,7 +52,7 @@ public RaftClientReplyProto submitClientRequest( throw new ServiceException(ioe); } final RaftClientReply reply = new RaftClientReply(request, true); - return ProtoUtils.toRaftClientReplyProto(proto, reply); + return ProtoUtils.toRaftClientReplyProto(proto.getRpcRequest(), reply); } @Override diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/server/HadoopRpcService.java b/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/server/HadoopRpcService.java index d4a4d71490..a761d01b93 100644 --- a/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/server/HadoopRpcService.java +++ b/raft-hadoop/src/main/java/org/apache/raft/hadoopRpc/server/HadoopRpcService.java @@ -165,4 +165,9 @@ public void sendClientReply(RaftClientRequest request, RaftClientReply reply, IOException ioe) throws IOException { // TODO } + + @Override + public void addPeerProxies(Iterable peers) throws IOException { + addPeers(peers); + } } diff --git a/raft-common/src/main/java/org/apache/raft/protocol/ConfigurationMessage.java b/raft-hadoop/src/test/java/org/apache/raft/hadoopRpc/TestRaftReconfigurationWithHadoopRpc.java similarity index 57% rename from raft-common/src/main/java/org/apache/raft/protocol/ConfigurationMessage.java rename to raft-hadoop/src/test/java/org/apache/raft/hadoopRpc/TestRaftReconfigurationWithHadoopRpc.java index 6bc015f8d9..cb1178cac0 100644 --- a/raft-common/src/main/java/org/apache/raft/protocol/ConfigurationMessage.java +++ b/raft-hadoop/src/test/java/org/apache/raft/hadoopRpc/TestRaftReconfigurationWithHadoopRpc.java @@ -15,23 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.raft.protocol; +package org.apache.raft.hadoopRpc; -import org.apache.raft.util.ProtoUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.raft.MiniRaftCluster; +import org.apache.raft.server.RaftServerConfigKeys; +import org.apache.raft.server.RaftReconfigurationBaseTest; -public class ConfigurationMessage implements Message { - private final RaftPeer[] members; - - public ConfigurationMessage(RaftPeer[] members) { - this.members = members; - } - - public RaftPeer[] getMembers() { - return members; - } +import java.io.IOException; +public class TestRaftReconfigurationWithHadoopRpc + extends RaftReconfigurationBaseTest { @Override - public byte[] getContent() { - return ProtoUtils.toConfigurationMessageProto(this).toByteArray(); + public MiniRaftCluster getCluster(int peerNum) throws IOException { + final Configuration hadoopConf = new Configuration(); + hadoopConf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0"); + return new MiniRaftClusterWithHadoopRpc(peerNum, prop, hadoopConf); } } diff --git a/raft-hadoop/src/test/java/org/apache/raft/hadoopRpc/TestRaftWithHadoopRpc.java b/raft-hadoop/src/test/java/org/apache/raft/hadoopRpc/TestRaftWithHadoopRpc.java index a0865b04c8..13d1b0f9fd 100644 --- a/raft-hadoop/src/test/java/org/apache/raft/hadoopRpc/TestRaftWithHadoopRpc.java +++ b/raft-hadoop/src/test/java/org/apache/raft/hadoopRpc/TestRaftWithHadoopRpc.java @@ -38,10 +38,10 @@ public class TestRaftWithHadoopRpc extends RaftBasicTests { GenericTestUtils.setLogLevel(RPC.Server.LOG, Level.WARN); } - private final Configuration conf = new Configuration(); private final MiniRaftClusterWithHadoopRpc cluster; public TestRaftWithHadoopRpc() throws IOException { + Configuration conf = new Configuration(); conf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0"); cluster = new MiniRaftClusterWithHadoopRpc(NUM_SERVERS, getProperties(), conf); } diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java index 38fcbbb945..68f37dad54 100644 --- a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java +++ b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java @@ -163,7 +163,7 @@ public boolean isRunning() { return runningState.get() != RunningState.STOPPED; } - public void kill() { + public synchronized void kill() { changeRunningState(RunningState.STOPPED); try { @@ -645,6 +645,6 @@ public synchronized void submitLocalSyncEvent() { } public void addPeersToRPC(Iterable peers) throws IOException { - serverRpc.addPeers(peers); + serverRpc.addPeerProxies(peers); } } diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java index cb807b50da..a6552705b3 100644 --- a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java +++ b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java @@ -44,5 +44,5 @@ void sendClientReply(RaftClientRequest request, RaftClientReply reply, IOExcepti throws IOException; /** add rpc information of the given peers */ - void addPeers(Iterable peers) throws IOException; + void addPeerProxies(Iterable peers) throws IOException; } diff --git a/raft-server/src/main/proto/RaftServerProtocol.proto b/raft-server/src/main/proto/RaftServerProtocol.proto index 1988f30dde..2c6202f425 100644 --- a/raft-server/src/main/proto/RaftServerProtocol.proto +++ b/raft-server/src/main/proto/RaftServerProtocol.proto @@ -24,32 +24,32 @@ package common.raft; import "Raft.proto"; message RaftServerRequestProto { - required RaftRpcRequestProto rpcRequest = 1; + optional RaftRpcRequestProto rpcRequest = 1; } message RaftServerReplyProto { - required RaftRpcReplyProto rpcReply = 1; - required uint64 term = 2; + optional RaftRpcReplyProto rpcReply = 1; + optional uint64 term = 2; } message RequestVoteRequestProto { - required RaftServerRequestProto serverRequest = 1; - required uint64 candidateTerm = 2; + optional RaftServerRequestProto serverRequest = 1; + optional uint64 candidateTerm = 2; optional TermIndexProto candidateLastEntry = 3; } message RequestVoteReplyProto { - required RaftServerReplyProto serverReply = 1; - required bool shouldShutdown = 2; + optional RaftServerReplyProto serverReply = 1; + optional bool shouldShutdown = 2; } message AppendEntriesRequestProto { - required RaftServerRequestProto serverRequest = 1; - required uint64 leaderTerm = 2; + optional RaftServerRequestProto serverRequest = 1; + optional uint64 leaderTerm = 2; optional TermIndexProto previousLog = 3; repeated LogEntryProto entries = 4; - required uint64 leaderCommit = 5; - required bool initializing = 6; + optional uint64 leaderCommit = 5; + optional bool initializing = 6; } message AppendEntriesReplyProto { @@ -59,9 +59,9 @@ message AppendEntriesReplyProto { INCONSISTENCY = 3; } - required RaftServerReplyProto serverReply = 1; - required uint64 nextIndex = 2; - required AppendResult result = 3; + optional RaftServerReplyProto serverReply = 1; + optional uint64 nextIndex = 2; + optional AppendResult result = 3; } message SnapshotChunkProto { diff --git a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java index b291646c2e..765c2353ab 100644 --- a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java +++ b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java @@ -186,6 +186,10 @@ public void startServer(String id) { server.start(); } + private RaftPeer getPeer(RaftServer s) { + return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()); + } + /** * prepare the peer list when removing some peers from the conf */ @@ -194,7 +198,7 @@ public PeerChanges removePeers(int number, boolean removeLeader, Collection peers = new ArrayList<>(conf.getPeers()); List removedPeers = new ArrayList<>(number); if (removeLeader) { - final RaftPeer leader = new RaftPeer(getLeader().getId()); + final RaftPeer leader = getPeer(getLeader()); assert !excluded.contains(leader); peers.remove(leader); removedPeers.add(leader); @@ -202,7 +206,7 @@ public PeerChanges removePeers(int number, boolean removeLeader, List followers = getFollowers(); for (int i = 0, removed = 0; i < followers.size() && removed < (removeLeader ? number - 1 : number); i++) { - RaftPeer toRemove = new RaftPeer(followers.get(i).getId()); + RaftPeer toRemove = getPeer(followers.get(i)); if (!excluded.contains(toRemove)) { peers.remove(toRemove); removedPeers.add(toRemove); @@ -245,22 +249,22 @@ public String printAllLogs() { public RaftServer getLeader() { final List leaders = new ArrayList<>(); - for(RaftServer s : servers.values()) { - if (s.isRunning() && s.isLeader()) { - if (leaders.isEmpty()) { - leaders.add(s); - } else { - final long leaderTerm = leaders.get(0).getState().getCurrentTerm(); - final long term = s.getState().getCurrentTerm(); - if (term >= leaderTerm) { - if (term > leaderTerm) { - leaders.clear(); - } - leaders.add(s); + servers.values().stream() + .filter(s -> s.isRunning() && s.isLeader()) + .forEach(s -> { + if (leaders.isEmpty()) { + leaders.add(s); + } else { + final long leaderTerm = leaders.get(0).getState().getCurrentTerm(); + final long term = s.getState().getCurrentTerm(); + if (term >= leaderTerm) { + if (term > leaderTerm) { + leaders.clear(); } + leaders.add(s); } } - } + }); if (leaders.isEmpty()) { return null; } else if (leaders.size() != 1) { diff --git a/raft-server/src/test/java/org/apache/raft/server/TestRaftReconfiguration.java b/raft-server/src/test/java/org/apache/raft/server/RaftReconfigurationBaseTest.java similarity index 93% rename from raft-server/src/test/java/org/apache/raft/server/TestRaftReconfiguration.java rename to raft-server/src/test/java/org/apache/raft/server/RaftReconfigurationBaseTest.java index f49b0d567a..0a5512a625 100644 --- a/raft-server/src/test/java/org/apache/raft/server/TestRaftReconfiguration.java +++ b/raft-server/src/test/java/org/apache/raft/server/RaftReconfigurationBaseTest.java @@ -28,12 +28,10 @@ import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.server.simulation.MiniRaftClusterWithSimulatedRpc; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.server.storage.MemoryRaftLog; import org.apache.raft.server.storage.RaftLog; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.Logger; @@ -48,21 +46,18 @@ import static java.util.Arrays.asList; import static org.apache.raft.RaftTestUtil.waitAndCheckNewConf; -public class TestRaftReconfiguration { +public abstract class RaftReconfigurationBaseTest { static { GenericTestUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(MemoryRaftLog.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } - static final Logger LOG = LoggerFactory.getLogger(TestRaftReconfiguration.class); + static final Logger LOG = LoggerFactory.getLogger(RaftReconfigurationBaseTest.class); - private final RaftProperties prop = new RaftProperties(); + protected final RaftProperties prop = new RaftProperties(); - @Before - public void setup() { - prop.setBoolean(RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY, false); - } + public abstract MiniRaftCluster getCluster(int peerNum) throws IOException; /** * add 2 new peers (3 peers -> 5 peers), no leader change @@ -70,7 +65,7 @@ public void setup() { @Test public void testAddPeers() throws Exception { LOG.info("Start testAddPeers"); - MiniRaftCluster cluster = new MiniRaftClusterWithSimulatedRpc(3, prop); + MiniRaftCluster cluster = getCluster(3); cluster.start(); try { RaftTestUtil.waitForLeader(cluster); @@ -97,7 +92,7 @@ public void testAddPeers() throws Exception { @Test public void testRemovePeers() throws Exception { LOG.info("Start testRemovePeers"); - MiniRaftCluster cluster = new MiniRaftClusterWithSimulatedRpc(5, prop); + MiniRaftCluster cluster = getCluster(5); cluster.start(); try { RaftTestUtil.waitForLeader(cluster); @@ -135,7 +130,7 @@ public void testLeaderStepDown() throws Exception { } private void testAddRemovePeers(boolean leaderStepdown) throws Exception { - MiniRaftCluster cluster = new MiniRaftClusterWithSimulatedRpc(5, prop); + MiniRaftCluster cluster = getCluster(5); cluster.start(); try { RaftTestUtil.waitForLeader(cluster); @@ -160,7 +155,7 @@ private void testAddRemovePeers(boolean leaderStepdown) throws Exception { @Test(timeout = 30000) public void testReconfTwice() throws Exception { LOG.info("Start testReconfTwice"); - final MiniRaftCluster cluster = new MiniRaftClusterWithSimulatedRpc(3, prop); + final MiniRaftCluster cluster = getCluster(3); cluster.start(); try { RaftTestUtil.waitForLeader(cluster); @@ -225,7 +220,7 @@ public void testReconfTwice() throws Exception { public void testReconfTimeout() throws Exception { LOG.info("Start testReconfTimeout"); // originally 3 peers - final MiniRaftCluster cluster = new MiniRaftClusterWithSimulatedRpc(3, prop); + final MiniRaftCluster cluster = getCluster(3); cluster.start(); try { RaftTestUtil.waitForLeader(cluster); @@ -265,7 +260,7 @@ public void testReconfTimeout() throws Exception { public void testBootstrapReconf() throws Exception { LOG.info("Start testBootstrapReconf"); // originally 3 peers - final MiniRaftCluster cluster = new MiniRaftClusterWithSimulatedRpc(3, prop); + final MiniRaftCluster cluster = getCluster(3); cluster.start(); try { RaftTestUtil.waitForLeader(cluster); @@ -316,7 +311,7 @@ public void testBootstrapReconf() throws Exception { public void testKillLeaderDuringReconf() throws Exception { LOG.info("Start testKillLeaderDuringReconf"); // originally 3 peers - final MiniRaftCluster cluster = new MiniRaftClusterWithSimulatedRpc(3, prop); + final MiniRaftCluster cluster = getCluster(3); cluster.start(); try { RaftTestUtil.waitForLeader(cluster); diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedClientRequestReply.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedClientRequestReply.java index 9325742ce3..7018c8f20c 100644 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedClientRequestReply.java +++ b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedClientRequestReply.java @@ -22,6 +22,7 @@ import org.apache.raft.protocol.RaftClientRequest; import org.apache.raft.protocol.RaftPeer; +import java.io.IOException; import java.util.Collection; public class SimulatedClientRequestReply @@ -31,4 +32,9 @@ public class SimulatedClientRequestReply int simulateLatencyMs) { super(allPeers, simulateLatencyMs); } + + @Override + public void addServerProxies(Iterable servers) throws IOException { + // do nothing + } } diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java index 7b5378b1f2..bcb82c9386 100644 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java +++ b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java @@ -84,7 +84,7 @@ public void sendClientReply(RaftClientRequest request, RaftClientReply reply, IO } @Override - public void addPeers(Iterable peers) throws IOException { + public void addPeerProxies(Iterable peers) throws IOException { // do nothing }