Skip to content

Commit

Permalink
[v2.0] 为每个请求加上了请求号,客户端在调用后会检查响应与请求的请求号
Browse files Browse the repository at this point in the history
  • Loading branch information
CN-GuoZiyang committed Jun 18, 2020
1 parent 3b9459a commit eabc07f
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
@AllArgsConstructor
public class RpcRequest implements Serializable {

/**
* 请求号
*/
private String requestId;
/**
* 待调用接口名称
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
@Data
public class RpcResponse<T> implements Serializable {

/**
* 响应对应的请求号
*/
private String requestId;
/**
* 响应状态码
*/
Expand All @@ -29,15 +33,17 @@ public class RpcResponse<T> implements Serializable {
public RpcResponse() {
}

public static <T> RpcResponse<T> success(T data) {
public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setRequestId(requestId);
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}

public static <T> RpcResponse<T> fail(ResponseCode code) {
public static <T> RpcResponse<T> fail(ResponseCode code, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setRequestId(requestId);
response.setStatusCode(code.getCode());
response.setMessage(code.getMessage());
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public enum RpcError {
UNKNOWN_PROTOCOL("不识别的协议包"),
UNKNOWN_SERIALIZER("不识别的(反)序列化器"),
UNKNOWN_PACKAGE_TYPE("不识别的数据包类型"),
SERIALIZER_NOT_FOUND("找不到序列化器");
SERIALIZER_NOT_FOUND("找不到序列化器"),
RESPONSE_NOT_MATCH("响应与请求号不匹配");

private final String message;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package top.guoziyang.rpc.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.guoziyang.rpc.entity.RpcRequest;
import top.guoziyang.rpc.entity.RpcResponse;
import top.guoziyang.rpc.enumeration.ResponseCode;
import top.guoziyang.rpc.enumeration.RpcError;
import top.guoziyang.rpc.exception.RpcException;

/**
* 检查响应与请求
*
* @author ziyang
*/
public class RpcMessageChecker {

public static final String INTERFACE_NAME = "interfaceName";
private static final Logger logger = LoggerFactory.getLogger(RpcMessageChecker.class);

private RpcMessageChecker() {
}

public static void check(RpcRequest rpcRequest, RpcResponse rpcResponse) {
if (rpcResponse == null) {
logger.error("调用服务失败,serviceName:{}", rpcRequest.getInterfaceName());
throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}

if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {
throw new RpcException(RpcError.RESPONSE_NOT_MATCH, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}

if (rpcResponse.getStatusCode() == null || !rpcResponse.getStatusCode().equals(ResponseCode.SUCCESS.getCode())) {
logger.error("调用服务失败,serviceName:{},RpcResponse:{}", rpcRequest.getInterfaceName(), rpcResponse);
throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws
try {
method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
} catch (NoSuchMethodException e) {
return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND);
return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND, rpcRequest.getRequestId());
}
return method.invoke(service, rpcRequest.getParameters());
}
Expand Down
5 changes: 3 additions & 2 deletions rpc-core/src/main/java/top/guoziyang/rpc/RpcClientProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;

/**
* RPC客户端动态代理
Expand All @@ -29,9 +30,9 @@ public <T> T getProxy(Class<T> clazz) {
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
public Object invoke(Object proxy, Method method, Object[] args) {
logger.info("调用方法: {}#{}", method.getDeclaringClass().getName(), method.getName());
RpcRequest rpcRequest = new RpcRequest(method.getDeclaringClass().getName(),
RpcRequest rpcRequest = new RpcRequest(UUID.randomUUID().toString(), method.getDeclaringClass().getName(),
method.getName(), args, method.getParameterTypes());
return client.sendRequest(rpcRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import top.guoziyang.rpc.exception.RpcException;
import top.guoziyang.rpc.serializer.CommonSerializer;
import top.guoziyang.rpc.serializer.KryoSerializer;
import top.guoziyang.rpc.util.RpcMessageChecker;

/**
* NIO方式消费侧客户端类
Expand Down Expand Up @@ -74,8 +75,9 @@ protected void initChannel(SocketChannel ch) {
}
});
channel.closeFuture().sync();
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + rpcRequest.getRequestId());
RpcResponse rpcResponse = channel.attr(key).get();
RpcMessageChecker.check(rpcRequest, rpcResponse);
return rpcResponse.getData();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse>
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
try {
logger.info(String.format("客户端接收到消息: %s", msg));
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + msg.getRequestId());
ctx.channel().attr(key).set(msg);
ctx.channel().close();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Ex
String interfaceName = msg.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(msg, service);
ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result));
ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
future.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import top.guoziyang.rpc.serializer.CommonSerializer;
import top.guoziyang.rpc.socket.util.ObjectReader;
import top.guoziyang.rpc.socket.util.ObjectWriter;
import top.guoziyang.rpc.util.RpcMessageChecker;

import java.io.*;
import java.net.Socket;
Expand Down Expand Up @@ -54,6 +55,7 @@ public Object sendRequest(RpcRequest rpcRequest) {
logger.error("调用服务失败, service: {}, response:{}", rpcRequest.getInterfaceName(), rpcResponse);
throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, " service:" + rpcRequest.getInterfaceName());
}
RpcMessageChecker.check(rpcRequest, rpcResponse);
return rpcResponse.getData();
} catch (IOException e) {
logger.error("调用时有错误发生:", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void run() {
String interfaceName = rpcRequest.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(rpcRequest, service);
RpcResponse<Object> response = RpcResponse.success(result);
RpcResponse<Object> response = RpcResponse.success(result, rpcRequest.getRequestId());
ObjectWriter.writeObject(outputStream, response, serializer);
} catch (IOException e) {
logger.error("调用或发送时有错误发生:", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class SocketTestClient {

public static void main(String[] args) {
SocketClient client = new SocketClient("127.0.0.1", 9000);
SocketClient client = new SocketClient("127.0.0.1", 9999);
client.setSerializer(new KryoSerializer());
RpcClientProxy proxy = new RpcClientProxy(client);
HelloService helloService = proxy.getProxy(HelloService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static void main(String[] args) {
serviceRegistry.register(helloService);
SocketServer socketServer = new SocketServer(serviceRegistry);
socketServer.setSerializer(new HessianSerializer());
socketServer.start(9000);
socketServer.start(9999);
}

}

0 comments on commit eabc07f

Please sign in to comment.