Skip to content

Commit

Permalink
Add builder classes for RaftServerRpc and its implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tsz-Wo Nicholas Sze committed Jan 31, 2017
1 parent 9c44237 commit d16c5c6
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 33 deletions.
65 changes: 49 additions & 16 deletions ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,82 @@
package org.apache.ratis.grpc;

import com.google.common.base.Preconditions;

import org.apache.ratis.shaded.io.grpc.Server;
import org.apache.ratis.shaded.io.grpc.ServerBuilder;
import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.RaftClientProtocolService;
import org.apache.ratis.grpc.server.RaftServerProtocolClient;
import org.apache.ratis.grpc.server.RaftServerProtocolService;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.shaded.io.grpc.Server;
import org.apache.ratis.shaded.io.grpc.ServerBuilder;
import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT;
import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.ratis.grpc.RaftGrpcConfigKeys.*;

/** A grpc implementation of {@link RaftServerRpc}. */
public class RaftGRpcService implements RaftServerRpc {
static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class);
public static final String GRPC_SEND_SERVER_REQUEST =
RaftGRpcService.class.getSimpleName() + ".sendRequest";

public static class Builder extends RaftServerRpc.Builder<Builder,RaftGRpcService> {
private int maxMessageSize = RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT;

private Builder() {
super(RAFT_GRPC_SERVER_PORT_DEFAULT);
}

public int getMaxMessageSize() {
return maxMessageSize;
}

public Builder setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
return this;
}

public Builder setFromRaftProperties(RaftProperties properties) {
setPort(properties.getInt(RAFT_GRPC_SERVER_PORT_KEY,
RAFT_GRPC_SERVER_PORT_DEFAULT));
setMaxMessageSize(properties.getInt(RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT));
return this;
}

@Override
public Builder getThis() {
return this;
}

@Override
public RaftGRpcService build() {
return new RaftGRpcService(getServer(), getPort(), getMaxMessageSize());
}
}

public static Builder newBuilder() {
return new Builder();
}

private final Server server;
private final InetSocketAddress address;
private final Map<String, RaftServerProtocolClient> peers =
Collections.synchronizedMap(new HashMap<>());
private final String selfId;

public RaftGRpcService(RaftServer raftServer, RaftProperties properties) {
int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY,
RAFT_GRPC_SERVER_PORT_DEFAULT);
int maxMessageSize = properties.getInt(
RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT);
private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) {
ServerBuilder serverBuilder = ServerBuilder.forPort(port);
selfId = raftServer.getId();
server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize)
Expand All @@ -82,8 +116,7 @@ private void startService() {
try {
server.start();
} catch (IOException e) {
LOG.error("Failed to start Grpc server", e);
System.exit(1);
ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ private static Map<RaftPeer, RaftGRpcService> initRpcServices(
final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>();

for (RaftServerImpl s : servers) {
final RaftGRpcService rpc = new RaftGRpcService(s, prop);
final RaftGRpcService rpc = RaftGRpcService.newBuilder()
.setFromRaftProperties(prop)
.setServer(s)
.build();
peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
}
return peerRpcs;
Expand Down Expand Up @@ -113,7 +116,10 @@ protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
int oldPort = properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY,
RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT);
properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, port);
final RaftGRpcService rpc = new RaftGRpcService(server, properties);
final RaftGRpcService rpc = RaftGRpcService.newBuilder()
.setFromRaftProperties(properties)
.setServer(server)
.build();
Preconditions.checkState(
rpc.getInetSocketAddress().toString().contains(peer.getAddress()),
"address in the raft conf: %s, address in rpc server: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
*/
package org.apache.ratis.hadooprpc.server;

import java.io.IOException;
import java.net.InetSocketAddress;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
import org.apache.hadoop.ipc.RPC;
Expand All @@ -34,12 +32,7 @@
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.shaded.com.google.protobuf.BlockingService;
import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService;
import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService;
import org.apache.ratis.util.CodeInjectionForTesting;
Expand All @@ -48,21 +41,53 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetSocketAddress;

/** Server side Hadoop RPC service. */
public class HadoopRpcService implements RaftServerRpc {
public static final Logger LOG = LoggerFactory.getLogger(HadoopRpcService.class);
static final String CLASS_NAME = HadoopRpcService.class.getSimpleName();
public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest";

public static class Builder extends RaftServerRpc.Builder<Builder, HadoopRpcService> {
private Configuration conf;

private Builder() {
super(0);
}

public Configuration getConf() {
return conf;
}

public Builder setConf(Configuration conf) {
this.conf = conf;
return this;
}

@Override
public Builder getThis() {
return this;
}

@Override
public HadoopRpcService build() throws IOException {
return new HadoopRpcService(getServer(), getConf());
}
}

public static Builder newBuilder() {
return new Builder();
}

private final String id;
private final RPC.Server ipcServer;
private final InetSocketAddress ipcServerAddress;

private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies;

public HadoopRpcService(RaftServer server, final Configuration conf)
private HadoopRpcService(RaftServer server, final Configuration conf)
throws IOException {
this.proxies = new PeerProxyMap<>(
p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ private static Map<RaftPeer, HadoopRpcService> initRpcServices(
final Map<RaftPeer, HadoopRpcService> peerRpcs = new HashMap<>();

for(RaftServerImpl s : servers) {
final HadoopRpcService rpc = new HadoopRpcService(s, hadoopConf);
final HadoopRpcService rpc = HadoopRpcService.newBuilder()
.setServer(s)
.setConf(hadoopConf)
.build();
peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
}
return peerRpcs;
Expand All @@ -86,7 +89,10 @@ protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
hconf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, peer.getAddress());

RaftServerImpl server = servers.get(peer.getId());
final HadoopRpcService rpc = new HadoopRpcService(server, hconf);
final HadoopRpcService rpc = HadoopRpcService.newBuilder()
.setServer(server)
.setConf(hconf)
.build();
Preconditions.checkState(
rpc.getInetSocketAddress().toString().contains(peer.getAddress()),
"address in the raft conf: %s, address in rpc server: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@ public final class NettyRpcService implements RaftServerRpc {
static final String CLASS_NAME = NettyRpcService.class.getSimpleName();
public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest";

public static class Builder extends RaftServerRpc.Builder<Builder, NettyRpcService> {
private Builder() {
super(0);
}

@Override
public Builder getThis() {
return this;
}

@Override
public NettyRpcService build() {
return new NettyRpcService(getServer(), getPort());
}
}

public static Builder newBuilder() {
return new Builder();
}


private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
private final RaftServer server;
private final String id;
Expand All @@ -75,7 +96,7 @@ protected void channelRead0(ChannelHandlerContext ctx, RaftNettyServerRequestPro
}

/** Constructs a netty server with the given port. */
public NettyRpcService(int port, RaftServer server) {
private NettyRpcService(RaftServer server, int port) {
this.server = server;
this.id = server.getId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private static NettyRpcService newNettyRpcService(
RaftServerImpl s, RaftConfiguration conf) {
final String address = getAddress(s.getId(), conf);
final int port = NetUtils.newInetSocketAddress(address).getPort();
return new NettyRpcService(port, s);
return NettyRpcService.newBuilder().setServer(s).setPort(port).build();
}

private static Map<RaftPeer, NettyRpcService> initRpcServices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,48 @@
import org.apache.ratis.server.protocol.RaftServerProtocol;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;

/**
* An server-side interface for supporting different RPC implementations
* such as Netty, gRPC and Hadoop.
*/
public interface RaftServerRpc extends RaftServerProtocol, Closeable {
/** To build {@link RaftServerRpc} objects. */
abstract class Builder<B extends Builder, RPC extends RaftServerRpc> {
private RaftServer server;
private int port;

/** Construct a builder with the default port. */
protected Builder(int defaultPort) {
this.port = defaultPort;
}

public RaftServer getServer() {
return server;
}

public B setServer(RaftServer server) {
this.server = server;
return getThis();
}

public int getPort() {
return port;
}

/** Set the port for the server to listen to. */
public B setPort(int port) {
this.port = port;
return getThis();
}

protected abstract B getThis();

public abstract RPC build() throws IOException;
}

/** Start the RPC service. */
void start();

Expand Down

0 comments on commit d16c5c6

Please sign in to comment.