手写 基于Netty 的 RPC :RPC 需求分析以及基础模块

基于 Netty 的简化版的 RPC主要目的是让大家能够在实践中学习 Netty 的使用和原理,同时对 RPC 的设计也能有一个整体的学习。

配套的源码源代码地址:github.com/sean417/net… 的 netty-rpc 模块。

首先,我们先讨论一下为什么会出现 RPC 技术。

基于 Netty 的简易版 RPC 需求分析

随着分布式和微服务的盛行,给我们的项目带来的收益是不同模块间的解耦,从而使整个软件开发流程更加的灵活。同时,模块间的调用是稀松平常的事情。这就会出现一系列的新的需求:

  • 不同的模块有可能是分布在不同的机器上,要想相互调用一定会涉及到网络传输,所以要有相应的通信模块
  • 其次,网络传输的数据是二进制流。而在面向对象的程序中,业务处理的是对象,这就需要发送方在网络发送之前把对象序列化成二进制流,同时网络接收方收到二进制流后需要把二进制流反序列化成对象。
  • 同时,为了让调用方调用远程服务像调用本地方法一样简单,需要对网络请求、序列化做封装,Java 中一般采用动态代理去实现。
  • 还有,要有注册中心提供服务方的地址列表,同时出现了新的服务节点需要注册中心及时发现,这样调用方才能找到合适的服务方。
  • 最后,还需要负载均衡、熔断、降级、心跳探活等功能。

由于这是一篇专门讲解 Netty 的课程,我们只讲解一个简化版的 RPC 设计,注册中心和负载均衡,及心跳探活等功能就不讲了,主要流程如下:

RPC 执行流程.png

  1. 首先,客户端构建业务对象,比如我们可以初始化 NettyRpcRequest 对象。
  2. 接下来,客户端通过动态代理模块 来调用动态代理方法,用来实现封装 NettyRpcRequest 对象,把要调用的服务和方法,以及方法参数准备通过网络请求发送出去。
  3. 在发送之前通过编码模块转换对象序列化为字节数组。
  4. 动态代理随后会通过网络通信把序列化成字节数组的请求发送给服务端,同时客户端同步或异步等待服务端的响应。这些工作都由动态代理完成,对于调用方来说是无感的
  5. 服务端收到客户端的请求后,把字节数组反序列化成业务对象。
  6. 服务端根据请求中要调用的类和方法,通过反射实例化类并找到对应的方法。
  7. 服务端用收到的参数调用本地方法后封装响应对象。
  8. 把响应对象序列化为字符数组。
  9. 服务端把序列化的响应对象通过网络返回给客户端。
  10. 客户端收到序列化成字节数组的响应后反序列化成响应对象

接下来,我会带领大家把上述流程一一实现。首先,我们先学习一下实现 RPC 项目的两个基础模块。

RPC 基础模块

前置知识分为序列化、动态代理。

序列化模块

对于 RPC 来说,序列化模块是必不可少的,不同模块之间相互调用必须依赖序列化和反序列化。对于 Java 语言来说,序列化的方案很多,包括 Java JDK 自带的序列化、Google 的 Protobuf、FaceBook 的 Thrift、hessian 以及 Avro。我们采用的是性能,易用性都比较好的 Hessian。

Hessian 的依赖配置:

<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.38</version>
</dependency>
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>4.0.38</version>
</dependency>
<dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.38</version> </dependency>

现在,我们基于 hessian 来实现一个序列化模块为 RPC 提供序列化和反序列化功能,创建一个序列化和反序列化的类。

public class HessianSerialization {
// 序列化一个对象
public static byte[] serialize(Object object) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(byteArrayOutputStream);
ho.writeObject(object);
// 序列化成二进制字节数组
byte[] bytes = byteArrayOutputStream.toByteArray();
return bytes;
}
// 反序列化
public static Object deserialize(byte[] bytes,Class clazz) throws IOException{
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
HessianInput hessianInput = new HessianInput((byteArrayInputStream));
// 反序列化成对象
Object object = hessianInput.readObject(clazz);
return object;
}
}
public class HessianSerialization {


    // 序列化一个对象
    public static byte[] serialize(Object object) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        HessianOutput ho = new HessianOutput(byteArrayOutputStream);
        ho.writeObject(object);
        // 序列化成二进制字节数组
        byte[] bytes = byteArrayOutputStream.toByteArray();
        return bytes;
    }

    // 反序列化
    public static Object deserialize(byte[] bytes,Class clazz) throws IOException{
        ByteArrayInputStream byteArrayInputStream =  new ByteArrayInputStream(bytes);
        HessianInput hessianInput = new HessianInput((byteArrayInputStream));
        // 反序列化成对象
        Object object = hessianInput.readObject(clazz);

        return object;
    }
}
public class HessianSerialization { // 序列化一个对象 public static byte[] serialize(Object object) throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); HessianOutput ho = new HessianOutput(byteArrayOutputStream); ho.writeObject(object); // 序列化成二进制字节数组 byte[] bytes = byteArrayOutputStream.toByteArray(); return bytes; } // 反序列化 public static Object deserialize(byte[] bytes,Class clazz) throws IOException{ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); HessianInput hessianInput = new HessianInput((byteArrayInputStream)); // 反序列化成对象 Object object = hessianInput.readObject(clazz); return object; } }

HessianSerialization 提供了序列化和反序列化的方法。

我们测试一下:

public class HessianSerializationTest {
public static void main(String[] args) throws Exception{
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName("TestClass");
rpcRequest.setMethodName("sayHello()");
rpcRequest.setParameterClasses(new String[]{"String"});
rpcRequest.setParameters(new Object[]{"sean"});
rpcRequest.setInvokerApplicationName("RpcClient");
rpcRequest.setInvokerIp("127.0.0.1");
// 把对象序列化成字节数组
byte[] bytes = HessianSerialization.serialize(rpcRequest);
// 打印字节数组长度
System.out.println(bytes.length);
// 把字节数组反序列化成对象
RpcRequest deserializedRpcRequest =
(RpcRequest) HessianSerialization.deserialize(bytes,RpcRequest.class);
System.out.println(deserializedRpcRequest.toString());
}
}
public class HessianSerializationTest {


    public static void main(String[] args) throws Exception{
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setRequestId(UUID.randomUUID().toString());
        rpcRequest.setClassName("TestClass");
        rpcRequest.setMethodName("sayHello()");
        rpcRequest.setParameterClasses(new String[]{"String"});
        rpcRequest.setParameters(new Object[]{"sean"});
        rpcRequest.setInvokerApplicationName("RpcClient");
        rpcRequest.setInvokerIp("127.0.0.1");


        // 把对象序列化成字节数组
        byte[] bytes = HessianSerialization.serialize(rpcRequest);
        // 打印字节数组长度
        System.out.println(bytes.length);
        
        // 把字节数组反序列化成对象
        RpcRequest deserializedRpcRequest =
                (RpcRequest) HessianSerialization.deserialize(bytes,RpcRequest.class);
        System.out.println(deserializedRpcRequest.toString());
    }
}
public class HessianSerializationTest { public static void main(String[] args) throws Exception{ RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setRequestId(UUID.randomUUID().toString()); rpcRequest.setClassName("TestClass"); rpcRequest.setMethodName("sayHello()"); rpcRequest.setParameterClasses(new String[]{"String"}); rpcRequest.setParameters(new Object[]{"sean"}); rpcRequest.setInvokerApplicationName("RpcClient"); rpcRequest.setInvokerIp("127.0.0.1"); // 把对象序列化成字节数组 byte[] bytes = HessianSerialization.serialize(rpcRequest); // 打印字节数组长度 System.out.println(bytes.length); // 把字节数组反序列化成对象 RpcRequest deserializedRpcRequest = (RpcRequest) HessianSerialization.deserialize(bytes,RpcRequest.class); System.out.println(deserializedRpcRequest.toString()); } }

使用 Java 8 以上的同学在运行时可能会报错,需要加上一下的运行配置:

image.png

运行结果:

image.png

大家可以看到,rpcRequest 对象经过序列化成为 byte 数组,我们再把 byte 数组成功的反序列化为 RpcRequest,也就是说 HessianSerialization 类的序列化和反序列化的功能是没有问题的。

下一步,我们把 HessianSerialization 整合进 Netty 里。

把 HessianSerialization 整合进 Netty 里

对象的序列化和反序列化的需求在于:当我们收到数据的时候需要把二进制的 byte 数组转换为业务对象,这里就需要在 Netty 的 pipeline 中添加 inbound Handler,而对于发送数据则需要把业务对象转换为二进制的 byte 数据,也就是需要在 Netty 的 pipeline 中添加 outbound Handler。

MessageToByteEncoder 如下图所示,MessageToByteEncoder 其实是一个 outbound Handler,所以我们可以继承 MessageToByteEncoder 来把序列化逻辑写进去。

image.png

首先我们看 RpcEncoder:

public class RpcEncoder extends MessageToByteEncoder {
private Class<?> targetClass;
public RpcEncoder(Class<?> targetClass){
this.targetClass = targetClass;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
if(targetClass.isInstance(o)){
// 序列化
byte[] bytes = HessianSerialization.serialize(o);
// 先写入 4 个字节的 长度
byteBuf.writeInt(bytes.length);
// 写入 byte 数组
byteBuf.writeBytes(bytes);
}
}
}
    public class RpcEncoder extends MessageToByteEncoder {
        
        private Class<?> targetClass;
        public RpcEncoder(Class<?> targetClass){
            this.targetClass = targetClass;
        }


        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
                if(targetClass.isInstance(o)){
                    // 序列化
                    byte[] bytes = HessianSerialization.serialize(o);
                    // 先写入 4 个字节的 长度
                    byteBuf.writeInt(bytes.length);
                    // 写入 byte 数组
                    byteBuf.writeBytes(bytes);
                }
        }
    }
public class RpcEncoder extends MessageToByteEncoder { private Class<?> targetClass; public RpcEncoder(Class<?> targetClass){ this.targetClass = targetClass; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { if(targetClass.isInstance(o)){ // 序列化 byte[] bytes = HessianSerialization.serialize(o); // 先写入 4 个字节的 长度 byteBuf.writeInt(bytes.length); // 写入 byte 数组 byteBuf.writeBytes(bytes); } } }

我们在序列化的时候用到了 HessianSerialization,我们先把序列化后的 byte 数组的对象字节大小 bytes.length 以 Int 类型写进 Byte 数组里,然后再把真正的序列化后对象体写进 byte 数组里。于是我们完成了负责序列化的 Handler。

要想在 Netty 中添加反序列化的功能,那一定要在 inbound handler 中添加,同样的我们看下 ByteToMessageDecoder 这个类:

image.png

这个类是一个 Inbound handler,我们可以通过继承它来把反序列化的逻辑写入 Netty。

反序列化类 RpcDecoder:

public class RpcDecoder extends ByteToMessageDecoder {
private static final int MESSAGE_BYTES_LENGTH = 4;
private static final int MESSAGE_LENGTH_NORMAL_LENGTH = 0;
private Class<?> targetClass;
public RpcDecoder(Class<?> targetClass){
this.targetClass = targetClass;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//1.校验整体数据长度
if(byteBuf.readableBytes()< MESSAGE_BYTES_LENGTH){
return;
}
// 2.对于 byteBuf 当前可以读的 readerIndex 做一个 mark 标记
// 这样以后可以找到发起 read 之前的 readerIndex 的位置
byteBuf.markReaderIndex();
// 3.读取 4 个字节的 int 类型用来表示消息的 byte 长度。
int messageLength = byteBuf.readInt();
// 4.如果长度小于正常对象长度,就关闭 channel。
if(messageLength < MESSAGE_LENGTH_NORMAL_LENGTH){
channelHandlerContext.close();
}
// 5.此时如果可读字节数小于对象的字节长度
// 本质是检查是否是拆包问题
if(byteBuf.readableBytes() < messageLength){
复位 readerIndex,下次再从对象体开始的地方读。
byteBuf.resetReaderIndex();
return;
}
// 6.读出 byte 数组
byte[] bytes = new byte[messageLength];
byteBuf.readBytes(bytes);
// 7.反序列化
Object object = HessianSerialization.deserialize(bytes, RpcRequest.class);
}
}
    public class RpcDecoder extends ByteToMessageDecoder {


        private static final int MESSAGE_BYTES_LENGTH = 4;

        private static final int MESSAGE_LENGTH_NORMAL_LENGTH = 0;


        private Class<?> targetClass;

        public RpcDecoder(Class<?> targetClass){
            this.targetClass = targetClass;
        }


        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            //1.校验整体数据长度
            if(byteBuf.readableBytes()< MESSAGE_BYTES_LENGTH){
                return;
            }
            // 2.对于 byteBuf 当前可以读的 readerIndex 做一个 mark 标记
            // 这样以后可以找到发起 read 之前的 readerIndex 的位置
            byteBuf.markReaderIndex();

            // 3.读取 4 个字节的 int 类型用来表示消息的 byte 长度。
            int messageLength = byteBuf.readInt();
            // 4.如果长度小于正常对象长度,就关闭 channel。
            if(messageLength < MESSAGE_LENGTH_NORMAL_LENGTH){
                channelHandlerContext.close();
            }

            // 5.此时如果可读字节数小于对象的字节长度
            // 本质是检查是否是拆包问题
            if(byteBuf.readableBytes() < messageLength){
                复位 readerIndex,下次再从对象体开始的地方读。
                byteBuf.resetReaderIndex();
                return;
            }
            // 6.读出 byte 数组
            byte[] bytes = new byte[messageLength];
            byteBuf.readBytes(bytes);
            // 7.反序列化
            Object object = HessianSerialization.deserialize(bytes, RpcRequest.class);
        }
    }
public class RpcDecoder extends ByteToMessageDecoder { private static final int MESSAGE_BYTES_LENGTH = 4; private static final int MESSAGE_LENGTH_NORMAL_LENGTH = 0; private Class<?> targetClass; public RpcDecoder(Class<?> targetClass){ this.targetClass = targetClass; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { //1.校验整体数据长度 if(byteBuf.readableBytes()< MESSAGE_BYTES_LENGTH){ return; } // 2.对于 byteBuf 当前可以读的 readerIndex 做一个 mark 标记 // 这样以后可以找到发起 read 之前的 readerIndex 的位置 byteBuf.markReaderIndex(); // 3.读取 4 个字节的 int 类型用来表示消息的 byte 长度。 int messageLength = byteBuf.readInt(); // 4.如果长度小于正常对象长度,就关闭 channel。 if(messageLength < MESSAGE_LENGTH_NORMAL_LENGTH){ channelHandlerContext.close(); } // 5.此时如果可读字节数小于对象的字节长度 // 本质是检查是否是拆包问题 if(byteBuf.readableBytes() < messageLength){ 复位 readerIndex,下次再从对象体开始的地方读。 byteBuf.resetReaderIndex(); return; } // 6.读出 byte 数组 byte[] bytes = new byte[messageLength]; byteBuf.readBytes(bytes); // 7.反序列化 Object object = HessianSerialization.deserialize(bytes, RpcRequest.class); } }

decode 的逻辑比较复杂,我会在后面给大家专门开一节课专门介绍粘包拆包的解决方案。

动态代理

动态代理的作用其实就是把一些通用的逻辑写在代理类里,而不用在业务代码里在充分的写这些通用逻辑,而且能够做到通用。比如,监控,安全验证等通用的逻辑就可以写在代理类里,然后用代理类去代理一些业务类的方法,这样调用业务类的方法时,业务的方法会自动调用通用逻辑代码,而不会在每个逻辑业务中再重复的写同样的通用逻辑。

这样做的好处是很明显的,一方面让我们的代码看起来更加的优雅,同时实现了业务逻辑和通用逻辑的解耦。

对于 Java 语言来说,动态代理的方案也比较多,我们为了简化就直接用了 Java 语言自带的动态代理模块,各种动态代理的解决方案的主要功能都基本差不多,只是性能和灵活性的区别。

现在,我们学习一下 Java 语言自带的动态代理模块。对应的类是 Proxy,这个类可以给指定的接口创建代理类。我们直接去看它的核心方法:

public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
public static Object newProxyInstance(ClassLoader loader,
                                      Class<?>[] interfaces,
                                      InvocationHandler h)
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)
  • 参数 interfaces:表示需要被代理的接口,可以是个接口数组,也就是说可以给多个接口创建代理类。
  • 参数 h:InvocationHandler 类型是真正实现代理逻辑的类,我们可以在这个类里添加公共逻辑,比如网络请求等。

我们在 Proxy 类的基础上封装了一个代理模块:

// 代理服务模块
public class ServiceProxy {
public static Object createProxy(ReferenceConfig referenceConfig){
// 构建代理实例
return Proxy.newProxyInstance(
ServiceProxy.class.getClassLoader(),
// 需要代理的接口
new Class[]{referenceConfig.getServiceInterfaceClass()},
new ServiceProxyInvocationHandler(referenceConfig));
}
//具体的代理逻辑
static class ServiceProxyInvocationHandler implements InvocationHandler{
private ReferenceConfig referenceConfig;
public ServiceProxyInvocationHandler(ReferenceConfig referenceConfig){
this.referenceConfig = referenceConfig;
}
// 通过动态代理改写类的方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
NettyRpcClient nettyRpcClient = new NettyRpcClient(referenceConfig);
// 连接服务端
nettyRpcClient.connect();
RpcRequest rpcRequest = new RpcRequest();
// 配置请求
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setServiceInterfaceClass(referenceConfig.getServiceInterfaceClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
// 远程调用
RpcResponse rpcResponse = nettyRpcClient.remoteCall(rpcRequest);
return rpcResponse.getResult();
}
}
}
// 代理服务模块
public class ServiceProxy {


    public static Object createProxy(ReferenceConfig referenceConfig){
            // 构建代理实例
            return Proxy.newProxyInstance(
                    ServiceProxy.class.getClassLoader(),
                    // 需要代理的接口
                    new Class[]{referenceConfig.getServiceInterfaceClass()},
                    new ServiceProxyInvocationHandler(referenceConfig));
    }



    //具体的代理逻辑
    static class ServiceProxyInvocationHandler implements InvocationHandler{

        private ReferenceConfig referenceConfig;

        public ServiceProxyInvocationHandler(ReferenceConfig referenceConfig){
            this.referenceConfig = referenceConfig;
        }
        // 通过动态代理改写类的方法
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            NettyRpcClient nettyRpcClient = new NettyRpcClient(referenceConfig);
            // 连接服务端
            nettyRpcClient.connect();
            RpcRequest rpcRequest = new RpcRequest();
            // 配置请求
            rpcRequest.setRequestId(UUID.randomUUID().toString());
            rpcRequest.setServiceInterfaceClass(referenceConfig.getServiceInterfaceClass().getName());
            rpcRequest.setMethodName(method.getName());
            rpcRequest.setParameterTypes(method.getParameterTypes());
            rpcRequest.setArgs(args);
            // 远程调用
            RpcResponse  rpcResponse = nettyRpcClient.remoteCall(rpcRequest);

            return rpcResponse.getResult();
        }
    }
}
// 代理服务模块 public class ServiceProxy { public static Object createProxy(ReferenceConfig referenceConfig){ // 构建代理实例 return Proxy.newProxyInstance( ServiceProxy.class.getClassLoader(), // 需要代理的接口 new Class[]{referenceConfig.getServiceInterfaceClass()}, new ServiceProxyInvocationHandler(referenceConfig)); } //具体的代理逻辑 static class ServiceProxyInvocationHandler implements InvocationHandler{ private ReferenceConfig referenceConfig; public ServiceProxyInvocationHandler(ReferenceConfig referenceConfig){ this.referenceConfig = referenceConfig; } // 通过动态代理改写类的方法 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { NettyRpcClient nettyRpcClient = new NettyRpcClient(referenceConfig); // 连接服务端 nettyRpcClient.connect(); RpcRequest rpcRequest = new RpcRequest(); // 配置请求 rpcRequest.setRequestId(UUID.randomUUID().toString()); rpcRequest.setServiceInterfaceClass(referenceConfig.getServiceInterfaceClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameterTypes(method.getParameterTypes()); rpcRequest.setArgs(args); // 远程调用 RpcResponse rpcResponse = nettyRpcClient.remoteCall(rpcRequest); return rpcResponse.getResult(); } } }

首先,通过 ServiceProxy 的构造方法传入我们需要远程调用的元数据 referenceConfig 类对象:

image.png

包括超时时间、服务方的 IP 和 PORT、需要被代理的类文件。

同时,把 referenceConfig 传入 ServiceProxyInvocationHandler 里,这样在方法 invoke 中就可以实现网络请求的逻辑了。

在 invode() 方法里主要实现了以下几个步骤:

  1. 客户端向服务端发起网络连接。
  2. 对请求对象 rpcRequest 的初始化和配置,包括调用接口、调用方法,以及调用方法的参数。
  3. 然后,发起远程调用。
  4. 等待服务端的响应,并返回响应结果。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYTkGYdN' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片