基于 Netty 的简化版的 RPC
主要目的是让大家能够在实践中学习 Netty 的使用和原理,同时对 RPC 的设计也能有一个整体的学习。
配套的源码源代码地址:github.com/sean417/net… 的 netty-rpc 模块。
首先,我们先讨论一下为什么会出现 RPC 技术。
基于 Netty 的简易版 RPC 需求分析
随着分布式和微服务的盛行,给我们的项目带来的收益是不同模块间的解耦,从而使整个软件开发流程更加的灵活。同时,模块间的调用是稀松平常的事情。这就会出现一系列的新的需求:
- 不同的模块有可能是分布在不同的机器上,要想相互调用一定会涉及到网络传输,所以要有相应的
通信模块
。 - 其次,网络传输的数据是二进制流。而在面向对象的程序中,业务处理的是对象,这就需要发送方在网络发送之前把对象
序列化
成二进制流,同时网络接收方收到二进制流后需要把二进制流反序列化
成对象。 - 同时,为了让调用方调用远程服务像调用本地方法一样简单,需要对网络请求、序列化做封装,Java 中一般采用
动态代理
去实现。 - 还有,要有
注册中心
提供服务方的地址列表,同时出现了新的服务节点需要注册中心及时发现,这样调用方才能找到合适的服务方。 - 最后,还需要负载均衡、熔断、降级、心跳探活等功能。
由于这是一篇专门讲解 Netty 的课程,我们只讲解一个简化版的 RPC 设计,注册中心和负载均衡,及心跳探活等功能就不讲了,主要流程如下:
- 首先,客户端构建业务对象,比如我们可以初始化 NettyRpcRequest 对象。
- 接下来,客户端通过
动态代理模块
来调用动态代理方法,用来实现封装 NettyRpcRequest 对象,把要调用的服务和方法,以及方法参数准备通过网络请求发送出去。 - 在发送之前通过编码模块转换
对象序列化
为字节数组。 动态代理
随后会通过网络通信把序列化成字节数组的请求发送给服务端,同时客户端同步或异步等待服务端的响应。这些工作都由动态代理完成,对于调用方来说是无感的
。- 服务端收到客户端的请求后,把字节数组
反序列化
成业务对象。 - 服务端根据请求中要调用的类和方法,通过反射实例化类并找到对应的方法。
- 服务端用收到的参数调用本地方法后封装响应对象。
- 把响应对象序列化为字符数组。
- 服务端把序列化的响应对象通过网络返回给客户端。
- 客户端收到序列化成字节数组的响应后
反序列化成响应对象
。
接下来,我会带领大家把上述流程一一实现。首先,我们先学习一下实现 RPC 项目的两个基础模块。
RPC 基础模块
前置知识分为序列化、动态代理。
序列化模块
对于 RPC 来说,序列化模块是必不可少的,不同模块之间相互调用必须依赖序列化和反序列化。对于 Java 语言来说,序列化的方案很多,包括 Java JDK 自带的序列化、Google 的 Protobuf、FaceBook 的 Thrift、hessian 以及 Avro。我们采用的是性能,易用性都比较好的 Hessian。
Hessian 的依赖配置:
<dependency><groupId>com.caucho</groupId><artifactId>hessian</artifactId><version>4.0.38</version></dependency><dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.38</version> </dependency><dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.38</version> </dependency>
现在,我们基于 hessian 来实现一个序列化模块为 RPC 提供序列化和反序列化功能,创建一个序列化和反序列化的类。
public class HessianSerialization {// 序列化一个对象public static byte[] serialize(Object object) throws IOException {ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();HessianOutput ho = new HessianOutput(byteArrayOutputStream);ho.writeObject(object);// 序列化成二进制字节数组byte[] bytes = byteArrayOutputStream.toByteArray();return bytes;}// 反序列化public static Object deserialize(byte[] bytes,Class clazz) throws IOException{ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);HessianInput hessianInput = new HessianInput((byteArrayInputStream));// 反序列化成对象Object object = hessianInput.readObject(clazz);return object;}}public class HessianSerialization { // 序列化一个对象 public static byte[] serialize(Object object) throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); HessianOutput ho = new HessianOutput(byteArrayOutputStream); ho.writeObject(object); // 序列化成二进制字节数组 byte[] bytes = byteArrayOutputStream.toByteArray(); return bytes; } // 反序列化 public static Object deserialize(byte[] bytes,Class clazz) throws IOException{ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); HessianInput hessianInput = new HessianInput((byteArrayInputStream)); // 反序列化成对象 Object object = hessianInput.readObject(clazz); return object; } }public class HessianSerialization { // 序列化一个对象 public static byte[] serialize(Object object) throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); HessianOutput ho = new HessianOutput(byteArrayOutputStream); ho.writeObject(object); // 序列化成二进制字节数组 byte[] bytes = byteArrayOutputStream.toByteArray(); return bytes; } // 反序列化 public static Object deserialize(byte[] bytes,Class clazz) throws IOException{ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); HessianInput hessianInput = new HessianInput((byteArrayInputStream)); // 反序列化成对象 Object object = hessianInput.readObject(clazz); return object; } }
HessianSerialization 提供了序列化和反序列化的方法。
我们测试一下:
public class HessianSerializationTest {public static void main(String[] args) throws Exception{RpcRequest rpcRequest = new RpcRequest();rpcRequest.setRequestId(UUID.randomUUID().toString());rpcRequest.setClassName("TestClass");rpcRequest.setMethodName("sayHello()");rpcRequest.setParameterClasses(new String[]{"String"});rpcRequest.setParameters(new Object[]{"sean"});rpcRequest.setInvokerApplicationName("RpcClient");rpcRequest.setInvokerIp("127.0.0.1");// 把对象序列化成字节数组byte[] bytes = HessianSerialization.serialize(rpcRequest);// 打印字节数组长度System.out.println(bytes.length);// 把字节数组反序列化成对象RpcRequest deserializedRpcRequest =(RpcRequest) HessianSerialization.deserialize(bytes,RpcRequest.class);System.out.println(deserializedRpcRequest.toString());}}public class HessianSerializationTest { public static void main(String[] args) throws Exception{ RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setRequestId(UUID.randomUUID().toString()); rpcRequest.setClassName("TestClass"); rpcRequest.setMethodName("sayHello()"); rpcRequest.setParameterClasses(new String[]{"String"}); rpcRequest.setParameters(new Object[]{"sean"}); rpcRequest.setInvokerApplicationName("RpcClient"); rpcRequest.setInvokerIp("127.0.0.1"); // 把对象序列化成字节数组 byte[] bytes = HessianSerialization.serialize(rpcRequest); // 打印字节数组长度 System.out.println(bytes.length); // 把字节数组反序列化成对象 RpcRequest deserializedRpcRequest = (RpcRequest) HessianSerialization.deserialize(bytes,RpcRequest.class); System.out.println(deserializedRpcRequest.toString()); } }public class HessianSerializationTest { public static void main(String[] args) throws Exception{ RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setRequestId(UUID.randomUUID().toString()); rpcRequest.setClassName("TestClass"); rpcRequest.setMethodName("sayHello()"); rpcRequest.setParameterClasses(new String[]{"String"}); rpcRequest.setParameters(new Object[]{"sean"}); rpcRequest.setInvokerApplicationName("RpcClient"); rpcRequest.setInvokerIp("127.0.0.1"); // 把对象序列化成字节数组 byte[] bytes = HessianSerialization.serialize(rpcRequest); // 打印字节数组长度 System.out.println(bytes.length); // 把字节数组反序列化成对象 RpcRequest deserializedRpcRequest = (RpcRequest) HessianSerialization.deserialize(bytes,RpcRequest.class); System.out.println(deserializedRpcRequest.toString()); } }
使用 Java 8 以上的同学在运行时可能会报错,需要加上一下的运行配置:
运行结果:
大家可以看到,rpcRequest 对象经过序列化成为 byte 数组,我们再把 byte 数组成功的反序列化为 RpcRequest,也就是说 HessianSerialization 类的序列化和反序列化的功能是没有问题的。
下一步,我们把 HessianSerialization 整合进 Netty 里。
把 HessianSerialization 整合进 Netty 里
对象的序列化和反序列化的需求在于:当我们收到数据的时候需要把二进制的 byte 数组转换为业务对象,这里就需要在 Netty 的 pipeline 中添加 inbound Handler,而对于发送数据则需要把业务对象转换为二进制的 byte 数据,也就是需要在 Netty 的 pipeline 中添加 outbound Handler。
MessageToByteEncoder 如下图所示,MessageToByteEncoder 其实是一个 outbound Handler,所以我们可以继承 MessageToByteEncoder 来把序列化逻辑写进去。
首先我们看 RpcEncoder:
public class RpcEncoder extends MessageToByteEncoder {private Class<?> targetClass;public RpcEncoder(Class<?> targetClass){this.targetClass = targetClass;}@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {if(targetClass.isInstance(o)){// 序列化byte[] bytes = HessianSerialization.serialize(o);// 先写入 4 个字节的 长度byteBuf.writeInt(bytes.length);// 写入 byte 数组byteBuf.writeBytes(bytes);}}}public class RpcEncoder extends MessageToByteEncoder { private Class<?> targetClass; public RpcEncoder(Class<?> targetClass){ this.targetClass = targetClass; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { if(targetClass.isInstance(o)){ // 序列化 byte[] bytes = HessianSerialization.serialize(o); // 先写入 4 个字节的 长度 byteBuf.writeInt(bytes.length); // 写入 byte 数组 byteBuf.writeBytes(bytes); } } }public class RpcEncoder extends MessageToByteEncoder { private Class<?> targetClass; public RpcEncoder(Class<?> targetClass){ this.targetClass = targetClass; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { if(targetClass.isInstance(o)){ // 序列化 byte[] bytes = HessianSerialization.serialize(o); // 先写入 4 个字节的 长度 byteBuf.writeInt(bytes.length); // 写入 byte 数组 byteBuf.writeBytes(bytes); } } }
我们在序列化的时候用到了 HessianSerialization,我们先把序列化后的 byte 数组的对象字节大小 bytes.length 以 Int 类型写进 Byte 数组里,然后再把真正的序列化后对象体写进 byte 数组里。于是我们完成了负责序列化的 Handler。
要想在 Netty 中添加反序列化的功能,那一定要在 inbound handler 中添加,同样的我们看下 ByteToMessageDecoder 这个类:
这个类是一个 Inbound handler,我们可以通过继承它来把反序列化的逻辑写入 Netty。
反序列化类 RpcDecoder:
public class RpcDecoder extends ByteToMessageDecoder {private static final int MESSAGE_BYTES_LENGTH = 4;private static final int MESSAGE_LENGTH_NORMAL_LENGTH = 0;private Class<?> targetClass;public RpcDecoder(Class<?> targetClass){this.targetClass = targetClass;}@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {//1.校验整体数据长度if(byteBuf.readableBytes()< MESSAGE_BYTES_LENGTH){return;}// 2.对于 byteBuf 当前可以读的 readerIndex 做一个 mark 标记// 这样以后可以找到发起 read 之前的 readerIndex 的位置byteBuf.markReaderIndex();// 3.读取 4 个字节的 int 类型用来表示消息的 byte 长度。int messageLength = byteBuf.readInt();// 4.如果长度小于正常对象长度,就关闭 channel。if(messageLength < MESSAGE_LENGTH_NORMAL_LENGTH){channelHandlerContext.close();}// 5.此时如果可读字节数小于对象的字节长度// 本质是检查是否是拆包问题if(byteBuf.readableBytes() < messageLength){复位 readerIndex,下次再从对象体开始的地方读。byteBuf.resetReaderIndex();return;}// 6.读出 byte 数组byte[] bytes = new byte[messageLength];byteBuf.readBytes(bytes);// 7.反序列化Object object = HessianSerialization.deserialize(bytes, RpcRequest.class);}}public class RpcDecoder extends ByteToMessageDecoder { private static final int MESSAGE_BYTES_LENGTH = 4; private static final int MESSAGE_LENGTH_NORMAL_LENGTH = 0; private Class<?> targetClass; public RpcDecoder(Class<?> targetClass){ this.targetClass = targetClass; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { //1.校验整体数据长度 if(byteBuf.readableBytes()< MESSAGE_BYTES_LENGTH){ return; } // 2.对于 byteBuf 当前可以读的 readerIndex 做一个 mark 标记 // 这样以后可以找到发起 read 之前的 readerIndex 的位置 byteBuf.markReaderIndex(); // 3.读取 4 个字节的 int 类型用来表示消息的 byte 长度。 int messageLength = byteBuf.readInt(); // 4.如果长度小于正常对象长度,就关闭 channel。 if(messageLength < MESSAGE_LENGTH_NORMAL_LENGTH){ channelHandlerContext.close(); } // 5.此时如果可读字节数小于对象的字节长度 // 本质是检查是否是拆包问题 if(byteBuf.readableBytes() < messageLength){ 复位 readerIndex,下次再从对象体开始的地方读。 byteBuf.resetReaderIndex(); return; } // 6.读出 byte 数组 byte[] bytes = new byte[messageLength]; byteBuf.readBytes(bytes); // 7.反序列化 Object object = HessianSerialization.deserialize(bytes, RpcRequest.class); } }public class RpcDecoder extends ByteToMessageDecoder { private static final int MESSAGE_BYTES_LENGTH = 4; private static final int MESSAGE_LENGTH_NORMAL_LENGTH = 0; private Class<?> targetClass; public RpcDecoder(Class<?> targetClass){ this.targetClass = targetClass; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { //1.校验整体数据长度 if(byteBuf.readableBytes()< MESSAGE_BYTES_LENGTH){ return; } // 2.对于 byteBuf 当前可以读的 readerIndex 做一个 mark 标记 // 这样以后可以找到发起 read 之前的 readerIndex 的位置 byteBuf.markReaderIndex(); // 3.读取 4 个字节的 int 类型用来表示消息的 byte 长度。 int messageLength = byteBuf.readInt(); // 4.如果长度小于正常对象长度,就关闭 channel。 if(messageLength < MESSAGE_LENGTH_NORMAL_LENGTH){ channelHandlerContext.close(); } // 5.此时如果可读字节数小于对象的字节长度 // 本质是检查是否是拆包问题 if(byteBuf.readableBytes() < messageLength){ 复位 readerIndex,下次再从对象体开始的地方读。 byteBuf.resetReaderIndex(); return; } // 6.读出 byte 数组 byte[] bytes = new byte[messageLength]; byteBuf.readBytes(bytes); // 7.反序列化 Object object = HessianSerialization.deserialize(bytes, RpcRequest.class); } }
decode 的逻辑比较复杂,我会在后面给大家专门开一节课专门介绍粘包拆包的解决方案。
动态代理
动态代理的作用其实就是把一些通用的逻辑写在代理类里,而不用在业务代码里在充分的写这些通用逻辑,而且能够做到通用。比如,监控,安全验证等通用的逻辑就可以写在代理类里,然后用代理类去代理一些业务类的方法,这样调用业务类的方法时,业务的方法会自动调用通用逻辑代码,而不会在每个逻辑业务中再重复的写同样的通用逻辑。
这样做的好处是很明显的,一方面让我们的代码看起来更加的优雅,同时实现了业务逻辑和通用逻辑的解耦。
对于 Java 语言来说,动态代理的方案也比较多,我们为了简化就直接用了 Java 语言自带的动态代理模块,各种动态代理的解决方案的主要功能都基本差不多,只是性能和灵活性的区别。
现在,我们学习一下 Java 语言自带的动态代理模块。对应的类是 Proxy,这个类可以给指定的接口创建代理类。我们直接去看它的核心方法:
public static Object newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h)public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)
- 参数 interfaces:表示需要被代理的接口,可以是个接口数组,也就是说可以给多个接口创建代理类。
- 参数 h:InvocationHandler 类型是真正实现代理逻辑的类,我们可以在这个类里添加公共逻辑,比如网络请求等。
我们在 Proxy 类的基础上封装了一个代理模块:
// 代理服务模块public class ServiceProxy {public static Object createProxy(ReferenceConfig referenceConfig){// 构建代理实例return Proxy.newProxyInstance(ServiceProxy.class.getClassLoader(),// 需要代理的接口new Class[]{referenceConfig.getServiceInterfaceClass()},new ServiceProxyInvocationHandler(referenceConfig));}//具体的代理逻辑static class ServiceProxyInvocationHandler implements InvocationHandler{private ReferenceConfig referenceConfig;public ServiceProxyInvocationHandler(ReferenceConfig referenceConfig){this.referenceConfig = referenceConfig;}// 通过动态代理改写类的方法@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {NettyRpcClient nettyRpcClient = new NettyRpcClient(referenceConfig);// 连接服务端nettyRpcClient.connect();RpcRequest rpcRequest = new RpcRequest();// 配置请求rpcRequest.setRequestId(UUID.randomUUID().toString());rpcRequest.setServiceInterfaceClass(referenceConfig.getServiceInterfaceClass().getName());rpcRequest.setMethodName(method.getName());rpcRequest.setParameterTypes(method.getParameterTypes());rpcRequest.setArgs(args);// 远程调用RpcResponse rpcResponse = nettyRpcClient.remoteCall(rpcRequest);return rpcResponse.getResult();}}}// 代理服务模块 public class ServiceProxy { public static Object createProxy(ReferenceConfig referenceConfig){ // 构建代理实例 return Proxy.newProxyInstance( ServiceProxy.class.getClassLoader(), // 需要代理的接口 new Class[]{referenceConfig.getServiceInterfaceClass()}, new ServiceProxyInvocationHandler(referenceConfig)); } //具体的代理逻辑 static class ServiceProxyInvocationHandler implements InvocationHandler{ private ReferenceConfig referenceConfig; public ServiceProxyInvocationHandler(ReferenceConfig referenceConfig){ this.referenceConfig = referenceConfig; } // 通过动态代理改写类的方法 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { NettyRpcClient nettyRpcClient = new NettyRpcClient(referenceConfig); // 连接服务端 nettyRpcClient.connect(); RpcRequest rpcRequest = new RpcRequest(); // 配置请求 rpcRequest.setRequestId(UUID.randomUUID().toString()); rpcRequest.setServiceInterfaceClass(referenceConfig.getServiceInterfaceClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameterTypes(method.getParameterTypes()); rpcRequest.setArgs(args); // 远程调用 RpcResponse rpcResponse = nettyRpcClient.remoteCall(rpcRequest); return rpcResponse.getResult(); } } }// 代理服务模块 public class ServiceProxy { public static Object createProxy(ReferenceConfig referenceConfig){ // 构建代理实例 return Proxy.newProxyInstance( ServiceProxy.class.getClassLoader(), // 需要代理的接口 new Class[]{referenceConfig.getServiceInterfaceClass()}, new ServiceProxyInvocationHandler(referenceConfig)); } //具体的代理逻辑 static class ServiceProxyInvocationHandler implements InvocationHandler{ private ReferenceConfig referenceConfig; public ServiceProxyInvocationHandler(ReferenceConfig referenceConfig){ this.referenceConfig = referenceConfig; } // 通过动态代理改写类的方法 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { NettyRpcClient nettyRpcClient = new NettyRpcClient(referenceConfig); // 连接服务端 nettyRpcClient.connect(); RpcRequest rpcRequest = new RpcRequest(); // 配置请求 rpcRequest.setRequestId(UUID.randomUUID().toString()); rpcRequest.setServiceInterfaceClass(referenceConfig.getServiceInterfaceClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameterTypes(method.getParameterTypes()); rpcRequest.setArgs(args); // 远程调用 RpcResponse rpcResponse = nettyRpcClient.remoteCall(rpcRequest); return rpcResponse.getResult(); } } }
首先,通过 ServiceProxy 的构造方法传入我们需要远程调用的元数据 referenceConfig 类对象:
包括超时时间、服务方的 IP 和 PORT、需要被代理的类文件。
同时,把 referenceConfig 传入 ServiceProxyInvocationHandler 里,这样在方法 invoke 中就可以实现网络请求的逻辑了。
在 invode() 方法里主要实现了以下几个步骤:
- 客户端向服务端发起网络连接。
- 对请求对象 rpcRequest 的初始化和配置,包括调用接口、调用方法,以及调用方法的参数。
- 然后,发起远程调用。
- 等待服务端的响应,并返回响应结果。