From d16c5c649311c8af3340c3373593821ba67d467f Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 31 Jan 2017 23:54:26 +0800 Subject: [PATCH] Add builder classes for RaftServerRpc and its implementations. --- .../apache/ratis/grpc/RaftGRpcService.java | 65 ++++++++++++++----- .../ratis/grpc/MiniRaftClusterWithGRpc.java | 10 ++- .../hadooprpc/server/HadoopRpcService.java | 47 ++++++++++---- .../MiniRaftClusterWithHadoopRpc.java | 10 ++- .../ratis/netty/server/NettyRpcService.java | 23 ++++++- .../ratis/netty/MiniRaftClusterWithNetty.java | 2 +- .../apache/ratis/server/RaftServerRpc.java | 35 ++++++++++ 7 files changed, 159 insertions(+), 33 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index b61e70efa9..9ea23c3b0e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -18,11 +18,6 @@ package org.apache.ratis.grpc; import com.google.common.base.Preconditions; - -import org.apache.ratis.shaded.io.grpc.Server; -import org.apache.ratis.shaded.io.grpc.ServerBuilder; -import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; -import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.client.RaftClientProtocolService; import org.apache.ratis.grpc.server.RaftServerProtocolClient; @@ -30,36 +25,75 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.shaded.io.grpc.Server; +import org.apache.ratis.shaded.io.grpc.ServerBuilder; +import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT; -import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY; - import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static org.apache.ratis.grpc.RaftGrpcConfigKeys.*; + +/** A grpc implementation of {@link RaftServerRpc}. */ public class RaftGRpcService implements RaftServerRpc { static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class); public static final String GRPC_SEND_SERVER_REQUEST = RaftGRpcService.class.getSimpleName() + ".sendRequest"; + public static class Builder extends RaftServerRpc.Builder { + private int maxMessageSize = RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT; + + private Builder() { + super(RAFT_GRPC_SERVER_PORT_DEFAULT); + } + + public int getMaxMessageSize() { + return maxMessageSize; + } + + public Builder setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + return this; + } + + public Builder setFromRaftProperties(RaftProperties properties) { + setPort(properties.getInt(RAFT_GRPC_SERVER_PORT_KEY, + RAFT_GRPC_SERVER_PORT_DEFAULT)); + setMaxMessageSize(properties.getInt(RAFT_GRPC_MESSAGE_MAXSIZE_KEY, + RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT)); + return this; + } + + @Override + public Builder getThis() { + return this; + } + + @Override + public RaftGRpcService build() { + return new RaftGRpcService(getServer(), getPort(), getMaxMessageSize()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + private final Server server; private final InetSocketAddress address; private final Map peers = Collections.synchronizedMap(new HashMap<>()); private final String selfId; - public RaftGRpcService(RaftServer raftServer, RaftProperties properties) { - int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY, - RAFT_GRPC_SERVER_PORT_DEFAULT); - int maxMessageSize = properties.getInt( - RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY, - RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT); + private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) { ServerBuilder serverBuilder = ServerBuilder.forPort(port); selfId = raftServer.getId(); server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize) @@ -82,8 +116,7 @@ private void startService() { try { server.start(); } catch (IOException e) { - LOG.error("Failed to start Grpc server", e); - System.exit(1); + ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG); } Runtime.getRuntime().addShutdownHook(new Thread() { @Override diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index f5c7b3f19b..7a996eb2f6 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -78,7 +78,10 @@ private static Map initRpcServices( final Map peerRpcs = new HashMap<>(); for (RaftServerImpl s : servers) { - final RaftGRpcService rpc = new RaftGRpcService(s, prop); + final RaftGRpcService rpc = RaftGRpcService.newBuilder() + .setFromRaftProperties(prop) + .setServer(s) + .build(); peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); } return peerRpcs; @@ -113,7 +116,10 @@ protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { int oldPort = properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT); properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, port); - final RaftGRpcService rpc = new RaftGRpcService(server, properties); + final RaftGRpcService rpc = RaftGRpcService.newBuilder() + .setFromRaftProperties(properties) + .setServer(server) + .build(); Preconditions.checkState( rpc.getInetSocketAddress().toString().contains(peer.getAddress()), "address in the raft conf: %s, address in rpc server: %s", diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index b7ac64acec..4d6979713d 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -17,9 +17,7 @@ */ package org.apache.ratis.hadooprpc.server; -import java.io.IOException; -import java.net.InetSocketAddress; - +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngineShaded; import org.apache.hadoop.ipc.RPC; @@ -34,12 +32,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.shaded.com.google.protobuf.BlockingService; import org.apache.ratis.shaded.com.google.protobuf.ServiceException; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService; import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService; import org.apache.ratis.util.CodeInjectionForTesting; @@ -48,7 +41,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.net.InetSocketAddress; /** Server side Hadoop RPC service. */ public class HadoopRpcService implements RaftServerRpc { @@ -56,13 +50,44 @@ public class HadoopRpcService implements RaftServerRpc { static final String CLASS_NAME = HadoopRpcService.class.getSimpleName(); public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; + public static class Builder extends RaftServerRpc.Builder { + private Configuration conf; + + private Builder() { + super(0); + } + + public Configuration getConf() { + return conf; + } + + public Builder setConf(Configuration conf) { + this.conf = conf; + return this; + } + + @Override + public Builder getThis() { + return this; + } + + @Override + public HadoopRpcService build() throws IOException { + return new HadoopRpcService(getServer(), getConf()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + private final String id; private final RPC.Server ipcServer; private final InetSocketAddress ipcServerAddress; private final PeerProxyMap> proxies; - public HadoopRpcService(RaftServer server, final Configuration conf) + private HadoopRpcService(RaftServer server, final Configuration conf) throws IOException { this.proxies = new PeerProxyMap<>( p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf)); diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index 964f3a20ad..b8c69d5ebd 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -74,7 +74,10 @@ private static Map initRpcServices( final Map peerRpcs = new HashMap<>(); for(RaftServerImpl s : servers) { - final HadoopRpcService rpc = new HadoopRpcService(s, hadoopConf); + final HadoopRpcService rpc = HadoopRpcService.newBuilder() + .setServer(s) + .setConf(hadoopConf) + .build(); peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); } return peerRpcs; @@ -86,7 +89,10 @@ protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { hconf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, peer.getAddress()); RaftServerImpl server = servers.get(peer.getId()); - final HadoopRpcService rpc = new HadoopRpcService(server, hconf); + final HadoopRpcService rpc = HadoopRpcService.newBuilder() + .setServer(server) + .setConf(hconf) + .build(); Preconditions.checkState( rpc.getInetSocketAddress().toString().contains(peer.getAddress()), "address in the raft conf: %s, address in rpc server: %s", diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 153f61ec25..b3f2efbd50 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -55,6 +55,27 @@ public final class NettyRpcService implements RaftServerRpc { static final String CLASS_NAME = NettyRpcService.class.getSimpleName(); public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; + public static class Builder extends RaftServerRpc.Builder { + private Builder() { + super(0); + } + + @Override + public Builder getThis() { + return this; + } + + @Override + public NettyRpcService build() { + return new NettyRpcService(getServer(), getPort()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); private final RaftServer server; private final String id; @@ -75,7 +96,7 @@ protected void channelRead0(ChannelHandlerContext ctx, RaftNettyServerRequestPro } /** Constructs a netty server with the given port. */ - public NettyRpcService(int port, RaftServer server) { + private NettyRpcService(RaftServer server, int port) { this.server = server; this.id = server.getId(); diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index 92e7722657..32ed98b265 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -72,7 +72,7 @@ private static NettyRpcService newNettyRpcService( RaftServerImpl s, RaftConfiguration conf) { final String address = getAddress(s.getId(), conf); final int port = NetUtils.newInetSocketAddress(address).getPort(); - return new NettyRpcService(port, s); + return NettyRpcService.newBuilder().setServer(s).setPort(port).build(); } private static Map initRpcServices( diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java index 5fecce31c7..61b3b2e0e7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java @@ -21,6 +21,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol; import java.io.Closeable; +import java.io.IOException; import java.net.InetSocketAddress; /** @@ -28,6 +29,40 @@ * such as Netty, gRPC and Hadoop. */ public interface RaftServerRpc extends RaftServerProtocol, Closeable { + /** To build {@link RaftServerRpc} objects. */ + abstract class Builder { + private RaftServer server; + private int port; + + /** Construct a builder with the default port. */ + protected Builder(int defaultPort) { + this.port = defaultPort; + } + + public RaftServer getServer() { + return server; + } + + public B setServer(RaftServer server) { + this.server = server; + return getThis(); + } + + public int getPort() { + return port; + } + + /** Set the port for the server to listen to. */ + public B setPort(int port) { + this.port = port; + return getThis(); + } + + protected abstract B getThis(); + + public abstract RPC build() throws IOException; + } + /** Start the RPC service. */ void start();