Skip to content

Commit

Permalink
Raft: make setConfiguration work with Hadoop RPC.
Browse files Browse the repository at this point in the history
(cherry picked from commit 7b43f7326e407350056179f1e4d2a44ee2de1448)
  • Loading branch information
Jing9 committed Aug 17, 2016
1 parent c3ab0d5 commit 4619aca
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 122 deletions.
10 changes: 6 additions & 4 deletions raft-client/src/main/java/org/apache/raft/client/RaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ static String nextLeader(final String leaderId, final Iterator<String> i) {
return first;
}

private void refreshPeers(RaftPeer[] newPeers) {
private void refreshPeers(RaftPeer[] newPeers) throws IOException {
if (newPeers != null && newPeers.length > 0) {
peers.clear();
for (RaftPeer p : newPeers) {
peers.put(p.getId(), p);
}
// also refresh the rpc proxies for these peers
client2serverRpc.addServerProxies(Arrays.asList(newPeers));
}
}

Expand Down Expand Up @@ -95,8 +97,8 @@ public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf)
}
}

private RaftClientReply sendRequest(RaftClientRequest r,
final String leader) throws InterruptedIOException {
private RaftClientReply sendRequest(RaftClientRequest r, final String leader)
throws IOException {
try {
return client2serverRpc.sendRequest(r);
} catch (NotLeaderException nle) {
Expand All @@ -118,7 +120,7 @@ private RaftClientReply sendRequest(RaftClientRequest r,
}

private void handleNotLeaderException(NotLeaderException e)
throws InterruptedIOException {
throws IOException {
LOG.debug("{}: got NotLeaderException", clientId, e);
refreshPeers(e.getPeers());
String newLeader = e.getSuggestedLeader() != null ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

import org.apache.raft.protocol.RaftClientReply;
import org.apache.raft.protocol.RaftClientRequest;
import org.apache.raft.protocol.RaftPeer;

import java.io.IOException;

public interface RaftClientRequestSender {
RaftClientReply sendRequest(RaftClientRequest request) throws IOException;

/** add rpc information of the given raft servers */
void addServerProxies(Iterable<RaftPeer> servers) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public class RaftPeer {
private final String address;

public RaftPeer(String id) {
this(id, id);
this.id = id;
this.address = null;
}

public RaftPeer(String id, InetSocketAddress address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@
import java.util.Arrays;

public class SetConfigurationRequest extends RaftClientRequest {
public SetConfigurationRequest(String requestorId, String replierId,
RaftPeer[] members) {
this(requestorId, replierId, new ConfigurationMessage(members));
}
private final RaftPeer[] peers;

public SetConfigurationRequest(String requestorId, String replierId,
ConfigurationMessage members) {
super(requestorId, replierId, members);
RaftPeer[] peers) {
super(requestorId, replierId, null);
this.peers = peers;
}

public RaftPeer[] getPeersInNewConf() {
return ((ConfigurationMessage) getMessage()).getMembers();
return peers;
}

@Override
Expand Down
41 changes: 15 additions & 26 deletions raft-common/src/main/java/org/apache/raft/util/ProtoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ public static ByteString toByteString(byte[] bytes) {
}

public static RaftPeerProto toRaftPeerProto(RaftPeer peer) {
return RaftPeerProto.newBuilder()
.setId(peer.getId())
.setAddress(peer.getAddress())
.build();
RaftPeerProto.Builder builder = RaftPeerProto.newBuilder()
.setId(peer.getId());
if (peer.getAddress() != null) {
builder.setAddress(peer.getAddress());
}
return builder.build();
}

public static RaftPeer toRaftPeer(RaftPeerProto p) {
return new RaftPeer(p.getId(), p.getAddress());
return new RaftPeer(p.getId(), p.hasAddress() ? p.getAddress() : null);
}

public static RaftPeer[] toRaftPeerArray(List<RaftPeerProto> protos) {
Expand Down Expand Up @@ -75,18 +77,6 @@ public RaftPeerProto next() {
};
}

public static ConfigurationMessageProto toConfigurationMessageProto(
ConfigurationMessage m) {
return ConfigurationMessageProto.newBuilder()
.addAllPeers(toRaftPeerProtos(Arrays.asList(m.getMembers())))
.build();
}

public static ConfigurationMessage toConfigurationMessage(
ConfigurationMessageProto p) {
return new ConfigurationMessage(toRaftPeerArray(p.getPeersList()));
}

public static boolean isConfigurationLogEntry(LogEntryProto entry) {
return entry.getType() == LogEntryProto.Type.CONFIGURATION;
}
Expand Down Expand Up @@ -143,34 +133,33 @@ public static RaftClientRequestProto toRaftClientRequestProto(
}

public static RaftClientReplyProto toRaftClientReplyProto(
RaftClientRequestProto request, RaftClientReply reply) {
RaftRpcRequestProto request, RaftClientReply reply) {
final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder();
if (reply != null) {
b.setRpcReply(toRaftRpcReplyProtoBuilder(request.getRpcRequest(), reply));
b.setRpcReply(toRaftRpcReplyProtoBuilder(request, reply));
}
return b.build();
}

public static SetConfigurationRequest toSetConfigurationRequest(
SetConfigurationRequestProto p) throws InvalidProtocolBufferException {
final RaftRpcMessageProto m = p.getClientRequest().getRpcRequest().getRpcMessage();
final ConfigurationMessageProto conf = ConfigurationMessageProto.parseFrom(
p.getClientRequest().getMessage().getContent().toByteArray());
return new SetConfigurationRequest(m.getRequestorId(), m.getReplyId(),
toRaftPeerArray(conf.getPeersList()));
final RaftRpcMessageProto m = p.getRpcRequest().getRpcMessage();
final RaftPeer[] peers = toRaftPeerArray(p.getPeersList());
return new SetConfigurationRequest(m.getRequestorId(), m.getReplyId(), peers);
}

public static SetConfigurationRequestProto toSetConfigurationRequestProto(
SetConfigurationRequest request) {
return SetConfigurationRequestProto.newBuilder()
.setClientRequest(toRaftClientRequestProto(request))
.setRpcRequest(toRaftRpcRequestProtoBuilder(request))
.addAllPeers(toRaftPeerProtos(Arrays.asList(request.getPeersInNewConf())))
.build();
}

public static SetConfigurationReplyProto toSetConfigurationReplyProto(
SetConfigurationRequestProto request, RaftClientReply reply) {
return SetConfigurationReplyProto.newBuilder()
.setClientReply(toRaftClientReplyProto(request.getClientRequest(), reply))
.setClientReply(toRaftClientReplyProto(request.getRpcRequest(), reply))
.build();
}

Expand Down
22 changes: 9 additions & 13 deletions raft-common/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ option java_generate_equals_and_hash = true;
package common.raft;

message RaftPeerProto {
required string id = 1; // id of the peer
required string address = 2; // e.g. IP address, hostname etc.
}

message ConfigurationMessageProto {
repeated RaftPeerProto peers = 1; // the peers in the new configuration
optional string id = 1; // id of the peer
optional string address = 2; // e.g. IP address, hostname etc.
}

message RaftConfigurationProto {
Expand All @@ -54,20 +50,20 @@ message LogEntryProto {
}

message TermIndexProto {
required uint64 term = 1;
required uint64 index = 2;
optional uint64 term = 1;
optional uint64 index = 2;
}

message RaftRpcMessageProto {
required string requestorId = 1;
required string replyId = 2;
optional string requestorId = 1;
optional string replyId = 2;
}

message RaftRpcRequestProto {
required RaftRpcMessageProto rpcMessage = 1;
optional RaftRpcMessageProto rpcMessage = 1;
}

message RaftRpcReplyProto {
required RaftRpcMessageProto rpcMessage = 1;
required bool success = 2;
optional RaftRpcMessageProto rpcMessage = 1;
optional bool success = 2;
}
14 changes: 8 additions & 6 deletions raft-common/src/main/proto/RaftClientProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,24 @@ package common.raft;

import "Raft.proto";

// normal client request
message RaftClientRequestProto {
required RaftRpcRequestProto rpcRequest = 1;

required ClientMessageEntryProto message = 2;
optional RaftRpcRequestProto rpcRequest = 1;
optional ClientMessageEntryProto message = 2;
}

message RaftClientReplyProto {
required RaftRpcReplyProto rpcReply = 1;
optional RaftRpcReplyProto rpcReply = 1;
}

// setConfiguration request
message SetConfigurationRequestProto {
required RaftClientRequestProto clientRequest = 1;
optional RaftRpcRequestProto rpcRequest = 1;
repeated RaftPeerProto peers = 2;
}

message SetConfigurationReplyProto {
required RaftClientReplyProto clientReply = 1;
optional RaftClientReplyProto clientReply = 1;
}

service RaftClientProtocolService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,9 @@ public RaftClientProtocolClientSideTranslatorPB createProxy(RaftPeer p)
RaftClientProtocolPB.class, p.getAddress(), getConf());
return new RaftClientProtocolClientSideTranslatorPB(proxy);
}

@Override
public void addServerProxies(Iterable<RaftPeer> servers) throws IOException {
addPeers(servers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public RaftClientReplyProto submitClientRequest(
throw new ServiceException(ioe);
}
final RaftClientReply reply = new RaftClientReply(request, true);
return ProtoUtils.toRaftClientReplyProto(proto, reply);
return ProtoUtils.toRaftClientReplyProto(proto.getRpcRequest(), reply);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,9 @@ public void sendClientReply(RaftClientRequest request, RaftClientReply reply,
IOException ioe) throws IOException {
// TODO
}

@Override
public void addPeerProxies(Iterable<RaftPeer> peers) throws IOException {
addPeers(peers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.raft.protocol;
package org.apache.raft.hadoopRpc;

import org.apache.raft.util.ProtoUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.raft.MiniRaftCluster;
import org.apache.raft.server.RaftServerConfigKeys;
import org.apache.raft.server.RaftReconfigurationBaseTest;

public class ConfigurationMessage implements Message {
private final RaftPeer[] members;

public ConfigurationMessage(RaftPeer[] members) {
this.members = members;
}

public RaftPeer[] getMembers() {
return members;
}
import java.io.IOException;

public class TestRaftReconfigurationWithHadoopRpc
extends RaftReconfigurationBaseTest {
@Override
public byte[] getContent() {
return ProtoUtils.toConfigurationMessageProto(this).toByteArray();
public MiniRaftCluster getCluster(int peerNum) throws IOException {
final Configuration hadoopConf = new Configuration();
hadoopConf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0");
return new MiniRaftClusterWithHadoopRpc(peerNum, prop, hadoopConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public class TestRaftWithHadoopRpc extends RaftBasicTests {
GenericTestUtils.setLogLevel(RPC.Server.LOG, Level.WARN);
}

private final Configuration conf = new Configuration();
private final MiniRaftClusterWithHadoopRpc cluster;

public TestRaftWithHadoopRpc() throws IOException {
Configuration conf = new Configuration();
conf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0");
cluster = new MiniRaftClusterWithHadoopRpc(NUM_SERVERS, getProperties(), conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public boolean isRunning() {
return runningState.get() != RunningState.STOPPED;
}

public void kill() {
public synchronized void kill() {
changeRunningState(RunningState.STOPPED);

try {
Expand Down Expand Up @@ -645,6 +645,6 @@ public synchronized void submitLocalSyncEvent() {
}

public void addPeersToRPC(Iterable<RaftPeer> peers) throws IOException {
serverRpc.addPeers(peers);
serverRpc.addPeerProxies(peers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ void sendClientReply(RaftClientRequest request, RaftClientReply reply, IOExcepti
throws IOException;

/** add rpc information of the given peers */
void addPeers(Iterable<RaftPeer> peers) throws IOException;
void addPeerProxies(Iterable<RaftPeer> peers) throws IOException;
}
28 changes: 14 additions & 14 deletions raft-server/src/main/proto/RaftServerProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,32 @@ package common.raft;
import "Raft.proto";

message RaftServerRequestProto {
required RaftRpcRequestProto rpcRequest = 1;
optional RaftRpcRequestProto rpcRequest = 1;
}

message RaftServerReplyProto {
required RaftRpcReplyProto rpcReply = 1;
required uint64 term = 2;
optional RaftRpcReplyProto rpcReply = 1;
optional uint64 term = 2;
}

message RequestVoteRequestProto {
required RaftServerRequestProto serverRequest = 1;
required uint64 candidateTerm = 2;
optional RaftServerRequestProto serverRequest = 1;
optional uint64 candidateTerm = 2;
optional TermIndexProto candidateLastEntry = 3;
}

message RequestVoteReplyProto {
required RaftServerReplyProto serverReply = 1;
required bool shouldShutdown = 2;
optional RaftServerReplyProto serverReply = 1;
optional bool shouldShutdown = 2;
}

message AppendEntriesRequestProto {
required RaftServerRequestProto serverRequest = 1;
required uint64 leaderTerm = 2;
optional RaftServerRequestProto serverRequest = 1;
optional uint64 leaderTerm = 2;
optional TermIndexProto previousLog = 3;
repeated LogEntryProto entries = 4;
required uint64 leaderCommit = 5;
required bool initializing = 6;
optional uint64 leaderCommit = 5;
optional bool initializing = 6;
}

message AppendEntriesReplyProto {
Expand All @@ -59,9 +59,9 @@ message AppendEntriesReplyProto {
INCONSISTENCY = 3;
}

required RaftServerReplyProto serverReply = 1;
required uint64 nextIndex = 2;
required AppendResult result = 3;
optional RaftServerReplyProto serverReply = 1;
optional uint64 nextIndex = 2;
optional AppendResult result = 3;
}

message SnapshotChunkProto {
Expand Down
Loading

0 comments on commit 4619aca

Please sign in to comment.