使用Netty和ZK实现一个简单的RPC框架(服务端实现)

1.服务端

1.服务注册

@Configuration
public class NettyServerStart {
    @Autowired
    private ServiceRegistrationScan registrationScan;
    
    @Autowired
    private NettyServer nettyServer;
    

    @PostConstruct
    public void start() {
        //服务注册
        registrationScan.scan();
        //服务器启动
        nettyServer.start();
    }
}

@PostConstruct注解的作用是在依赖注入完成后,执行被注解的方法。

深入ServiceRegistrationScan

@Component


public class ServiceRegistrationScan implements ApplicationContextAware {
    @Autowired
    private ServerZKit serverZKit;

    @Autowired
    private RpcServerConfiguration rpcServerConfiguration;
    

    //把spring容器注入进来因为会用到它去拿我们需要暴露服务的类
    private static ApplicationContext context;

    public void scan() {
        //创建一个根节点
        serverZKit.createRootNode();
        //拿到所有实现这个接口的类
        Map<String, Object> beansWithAnnotation = context.getBeansWithAnnotation(MoXiuRpcServer.class);
        if (!beansWithAnnotation.isEmpty()) {
            //拿到当前服务器的IP地址
            String realIp = IpUtil.getRealIp();
            for (Object value : beansWithAnnotation.values()) {
                MoXiuRpcServer annotation = value.getClass().getAnnotation(MoXiuRpcServer.class);
                //拿到类似于这样的  com.moxiu.xxx
                Class aClass = annotation.interfaceName();
                String s = aClass.getName();
                //以这个接口创建一个结点
                serverZKit.createPersistentNode(s);
                String childName = s + "/" + realIp +":"+ rpcServerConfiguration.getRpcPort();
                //创建本机IP地址加端口
                serverZKit.createNode(childName);
            }
        }
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        context = applicationContext;
    }

    public static <T> Class<T> getBean(String beanName) {
        return (Class<T>) context.getBean(beanName);
    }

}

以上代码为注入spring容器拿到所有实现了@MoXiuRpcServer注解的实现类,把该类注册到ZK中。

这个接口是什么呢?

@Component


@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface MoXiuRpcServer {
    Class interfaceName() default void.class; //实现类需要暴露的接口名字
}

怎么使用呢

@MoXiuRpcServer(interfaceName = UserService.class)
public class UserServiceImpl implements UserService {




    @Override


    public String getUser() {
        return null;
    }
}

因为客户端需要根据接口生成代理对象,所以这里需要你指定他到底是那个接口的实现类。

2.编写Netty服务器

Netty服务器采用主从Reactor模式及主Reactor只负责建立连接,从Reactor会负责到客户端数据的读写等业务处理。

@Component


public class NettyServer implements ServerStart {




    @Autowired
    private RpcServerConfiguration rpcServerConfiguration;

    @Override
    public void start() {
        //主Reactor 只负责连接建立所以只要1个线程就行
        EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("boosGroup"));
        //从Reactor 负责连接数据读写 传0默认为当前cpu核数*2个线程
        EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("workerGroup"));
        //业务线程池 负责业务处理可以不要
        EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(8, new DefaultThreadFactory("businessGroup"));
        ServerBootstrap bootstrap = new ServerBootstrap();
        MxRequestHandle mxRequestHandle = new MxRequestHandle();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel nioSocketChannel) throws Exception {
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();
                        //编码
                        pipeline.addLast("OneEncoding", new LengthFieldPrepender(4));
                        pipeline.addLast("MxResponseEncode", new MxRpcResponseEncode());
                        //解码
                        pipeline.addLast("OneDecode", new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
                        pipeline.addLast("MxRequestDecode", new MxRpcRequestDecode());
                        pipeline.addLast(eventExecutors, "MxRpcRequest", mxRequestHandle);
                    }
                });

        try {
            ChannelFuture sync = bootstrap.bind(rpcServerConfiguration.getRpcPort()).sync();
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            eventExecutors.shutdownGracefully();
        }
    }
}

这里详解各个编码器和解码器

  1. OneEncoding 为二次编码器,new LengthFieldPrepender(4)为发送的每一条数据前面添加一个固定4个字节表示为数据长度,解决粘包,黏包问题,
  2. MxResponseEncode为一次编码器,把MxResponse转化为ByteBuf交给一次编码器进行封装。
  3. OneDecode为一次解码器,把封装为自动长度4个字节的数据后面的数据读出来交给MxRequestDecode二次解码器。
  4. MxRequestDecode为二次解码器,把ByteBuf对象转化为MxRequest对象然后交给MxRpcRequest业务处理器处理。

这里给出一个流程图

image.png

数据写出就不画了很懒

MxRquest对象 客户端请求数据 服务端拿到数据后可以根据该信息去查找对应的方法然后调用封装给客户端

@Data

@Builder

public class MxRequest {
    //请求id
    private String requestId;
    //请求的类
    private String className;
    //请求的方法名
    private String methodName;
    //参数类型
    private Class<?>[] parameterTypes;
    //参数
    private Object[] parameters;

}

MxReponse对象 响应对象

@Data

@Builder

public class MxResponse {
    private String requestId;
    private Object result;
}

MxRpcResponseEncode 把MxResponse转化为ByteBuf

public class MxRpcResponseEncode extends MessageToByteEncoder<MxResponse> {


    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MxResponse mxResponse, ByteBuf byteBuf) throws Exception {
        byteBuf.writeBytes(ProtostuffUtils.serialize(mxResponse));
    }
}

MxRpcRequestDecode 把客户端发过来的ByteBuf转化为MxRequest对象给业务处理器去处理

public class MxRpcRequestDecode extends ByteToMessageDecoder {






    @Override


    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        MxRequest deserialize = ProtostuffUtils.deserialize(bytes, MxRequest.class);
        list.add(deserialize);
    }
}

序列化采用的是Protostuff速度快也不用去用Proto生成的对象,不好用我就用封装的了。

业务处理handler

@ChannelHandler.Sharable
public class MxRequestHandle extends SimpleChannelInboundHandler<MxRequest> {




    @Override


    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MxRequest mxRequest) throws Exception {
        String requestId = mxRequest.getRequestId();
        String className = mxRequest.getClassName();
        Object[] parameters = mxRequest.getParameters();
        Class<?>[] parameterTypes = mxRequest.getParameterTypes();
        String methodName = mxRequest.getMethodName();
        //通过类名找到这个类
        Class bean = ServiceRegistrationScan.getBean(className);
        //拿到调用的方法
        Method method = bean.getMethod(methodName, parameterTypes);
        //调用方法
        Object invoke = method.invoke(bean, parameters);
        //封装返回值
        MxResponse build = MxResponse.builder().requestId(requestId).result(invoke).build();
        //写出
        channelHandlerContext.channel().writeAndFlush(build);
    }
}

3.测试服务注册功能

image.png
可见该类已经注册到ZK中。
后续可带来客户端调试,大家也发现了这和dubbo的很像大概一致,后续带来dubbo源码解析。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYnmyiyE' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片