手撸RPC框架 – 注册中心基础功能实现

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前言

在之前我们实现了服务提供者发送请求,已经服务提供者收到请求以后调用业务真实方法,然后返回响应结果了。接下来我们需要完成的是注册中心的基础功能

一个完整的RPC框架,注册中心是其中最重要的一部分,服务提供者需要注册到注册中心,之后服务提供者需要从注册中心获取到服务提供者,然后才能发送请求。

实现

创建一个注册中心模块 xpc-rpc-register,然后创建二个子项目,分别是:xpc-rpc-register-commonxpc-rpc-register-zookeeper,项目结构如下

image.png

xpc-rpc-register-commonpom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>xpc-rpc-register</artifactId>
        <groupId>com.xpc</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>xpc-rpc-register-common</artifactId>



    <dependencies>
        <dependency>
            <groupId>com.xpc</groupId>
            <artifactId>xpc-rpc-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

创建一个注册配置类 com.xpc.rpc.register.common.config.RegisterConfig

package com.xpc.rpc.register.common.config;




import com.xpc.rpc.common.utils.yml.BootYamlUtils;


import java.net.InetAddress;
import java.net.UnknownHostException;


/**
 * 注册的基本配置类
 */
public class RegisterConfig {


    /**
     * 注册地址
     */
    private String registerAddress;

    /**
     * 注册类型
     */
    private String registerType;

    /**
     * Netty监听端口
     */
    private Integer registerPort;


    /**
     * 服务名称
     */
    private String applicationName;

    /**
     * 项目服务的监听端口
     */
    private Integer serverPort;

    /**
     * 本机的IP地址
     */
    private String serverAddress;



    public RegisterConfig() {
        this.registerAddress = (String) BootYamlUtils.getProperties("register.address");
        this.registerType = (String)BootYamlUtils.getProperties("register.type");
        this.applicationName = (String)BootYamlUtils.getProperties("application.name");
        this.registerPort = (Integer)BootYamlUtils.getProperties("register.port");
        this.serverPort = (Integer)BootYamlUtils.getProperties("server.port");
        try {
            this.serverAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }



    public String getRegisterAddress() {
        return registerAddress;
    }



    public void setRegisterAddress(String registerAddress) {
        this.registerAddress = registerAddress;
    }



    public String getRegisterType() {
        return registerType;
    }



    public void setRegisterType(String registerType) {
        this.registerType = registerType;
    }



    public String getApplicationName() {
        return applicationName;
    }



    public void setApplicationName(String applicationName) {
        this.applicationName = applicationName;
    }



    public Integer getRegisterPort() {
        return registerPort;
    }


    public void setRegisterPort(Integer registerPort) {
        this.registerPort = registerPort;
    }

    public Integer getServerPort() {
        return serverPort;
    }

    public void setServerPort(Integer serverPort) {
        this.serverPort = serverPort;
    }

    public String getServerAddress() {
        return serverAddress;
    }

    public void setServerAddress(String serverAddress) {
        this.serverAddress = serverAddress;
    }
}
  • registerType:注册类型,有zookeeper,nacos,consul等注册中心
  • registerAddress:注册中心的地址,zookeeper就是127.0.0.1:2181
  • registerPort:Netty启动需要绑定一个监听端口,也就是Netty服务端的监听端口
  • applicationName:服务名称,也就是我们自己项目的应用名称
  • serverPort:我们自己服务的监听端口
  • serverAddress:我们自己服务部署的机器的IP地址

在配置类的构造方法中,我们通过使用自己的工具类,从配置文件中获取对应的配置的值,然后进行赋值操作

创建一个服务注册接口 com.xpc.rpc.register.common.RegisterService

package com.xpc.rpc.register.common;




import com.xpc.rpc.protocol.meta.ServiceMeta;



public interface RegisterService {

      /**
       * 服务注册
       * @param serviceMeta 服务注册元数据
       */
      void register(ServiceMeta serviceMeta) throws Exception;


      /**
       * 服务发现
       * @param serviceName
       * @return
       * @throws Exception
       */
      ServiceMeta discovery(String serviceName) throws Exception;
}

接下来就是服务注册的元数据类:com.xpc.rpc.protocol.meta.ServiceMeta

package com.xpc.rpc.protocol.meta;




/**
 * 服务注册元数据
 */
public class ServiceMeta {


    private String interfaceName;

    /**
     * 本机的服务地址
     */
    private String serverAddress;


    /**
     * 服务端口号
     */


    private Integer serverPort;

    /**
     * Netty监听端口
     */
    private Integer registerPort;


    /**
     * 服务名称
     */
    private String applicationName;
    
    /**
     * 负载均衡策略
     */
    private String loanBalancedType;


    public String getLoanBalancedType() {
        return loanBalancedType;
    }

    public void setLoanBalancedType(String loanBalancedType) {
        this.loanBalancedType = loanBalancedType;
    }



    public String getInterfaceName() {
        return interfaceName;
    }

    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    public Integer getRegisterPort() {
        return registerPort;
    }



    public void setRegisterPort(Integer registerPort) {
        this.registerPort = registerPort;
    }



    public String getApplicationName() {
        return applicationName;
    }



    public void setApplicationName(String applicationName) {
        this.applicationName = applicationName;
    }



    public String getServerAddress() {
        return serverAddress;
    }



    public void setServerAddress(String serverAddress) {
        this.serverAddress = serverAddress;
    }



    public Integer getServerPort() {
        return serverPort;
    }



    public void setServerPort(Integer serverPort) {
        this.serverPort = serverPort;
    }

}

服务注册和发现的接口设计好了,那么接下来就是去实现这二个接口具体功能了,具体的实现功能是在 com.xpc.rpc.register.zookeeper.ZookeeperRegisterServiceImpl 类中

package com.xpc.rpc.register.zookeeper;




import com.xpc.rpc.protocol.meta.ServiceMeta;

import com.xpc.rpc.register.common.RegisterService;
import com.xpc.rpc.register.common.config.RegisterConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;


/**
 * zookeeper注册中心功能实现
 */
public class ZookeeperRegisterServiceImpl implements RegisterService {

    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperRegisterServiceImpl.class);


    private static final int BASE_SLEEP_TIME_MS = 1000;


    private static final int MAX_RETRIES = 3;

    private static final String ZK_BASE_PATH = "/xpc_rpc";

    private ServiceDiscovery<ServiceMeta> serviceDiscovery;

    private RegisterConfig registerConfig;



    public ZookeeperRegisterServiceImpl(RegisterConfig registerConfig) {
        this.registerConfig = registerConfig;
        CuratorFramework client = CuratorFrameworkFactory.newClient(registerConfig.getRegisterAddress(), new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
        client.start();
        JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);
        this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMeta.class)
                .client(client)
                .serializer(serializer)
                .basePath(ZK_BASE_PATH)
                .build();
        try {
            this.serviceDiscovery.start();
        } catch (Exception e) {
            LOGGER.error("serviceDiscovery start error :{}",e);
        }
    }

    //服务注册
    @Override
    public void register(ServiceMeta serviceMeta) throws Exception {
        ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance
                .<ServiceMeta>builder()
                .name(serviceMeta.getApplicationName())
                .address(serviceMeta.getServerAddress())
                .port(serviceMeta.getServerPort())
                .payload(serviceMeta)
                .build();
        serviceDiscovery.registerService(serviceInstance);
    }

    //服务发现
    @Override
    public ServiceMeta discovery(String serviceName) throws Exception {
        Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
        // TODO 后续会根据负载均衡策略来选取一个服务提供者
        return selectByLoadBalanceType((List<ServiceInstance<ServiceMeta>>)serviceInstances);
    }

    private ServiceMeta selectByLoadBalanceType(List<ServiceInstance<ServiceMeta>> list) {
        if(list == null || list.isEmpty()) {
            return null;
        }
        return list.get(0).getPayload();
    }
}

至此注册中心的功能就实现完毕了,接下来就是将服务提供者和服务消费者整合到注册中心

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYNWzJZG' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片