22.RocketMQ之NameServer启动流程

NameServerController启动流程总览

启动类:org.apache.rocketmq.namesrv.NamesrvStartup#main

public static void main(String[] args) {
    main0(args);
}



public static NamesrvController main0(String[] args) {
    try {
        //创建NamesrvController
        NamesrvController controller = createNamesrvController(args);
        //初始化并启动NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" +   
                        RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);


    }
    return null;
}

1.创建NamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {

    //设置MQ版本号

    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));




    //解析启动命令 start mqnamesrv.cmd

    Options options = ServerUtil.buildCommandlineOptions(new Options());

    commandLine = ServerUtil.parseCmdLine

            ("mqnamesrv",

             args, 

             buildCommandlineOptions(options), 

             new PosixParser());



    if (null == commandLine) {

        System.exit(-1);


        return null;

    }

​

    //创建NamesrvConfig

    final NamesrvConfig namesrvConfig = new NamesrvConfig();

    //创建NettyServerConfig

    final NettyServerConfig nettyServerConfig = new NettyServerConfig();

    //设置启动端口号9876

    nettyServerConfig.setListenPort(9876);

    //解析启动-c参数

    if (commandLine.hasOption('c')) {

        //-c指定配置文件

        String file = commandLine.getOptionValue('c');

        if (file != null) {

            //加载配置文件到流

            InputStream in = new BufferedInputStream

                                    (new FileInputStream(file));

            //加载属性到InputStream

            properties = new Properties();

            properties.load(in);

            //分别设置属性到namesrvConfig 和 nettyServerConfig  

            MixAll.properties2Object(properties, namesrvConfig);

            MixAll.properties2Object(properties, nettyServerConfig);

            //设置配置文件存储地址

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);

            in.close();

        }

    }

​


    //-p来指定是否打印配置项,在指定该选项时,直接退出。

    if (commandLine.hasOption('p')) {

        InternalLogger console 

                = InternalLoggerFactory.getLogger

                        (LoggerName.NAMESRV_CONSOLE_NAME);

        //打印namesrvConfig属性

        MixAll.printObjectProperties(console, namesrvConfig);

        //打印nettyServerConfig 属性

        MixAll.printObjectProperties(console, nettyServerConfig);

        System.exit(0);

    }

​

    //将启动参数填充到namesrvConfig,nettyServerConfig

    MixAll.properties2Object

                (ServerUtil.commandLine2Properties(commandLine), 

                                                        namesrvConfig);

​

    //创建NameServerController

    final NamesrvController controller 

            = new NamesrvController(namesrvConfig, nettyServerConfig);



    controller.getConfiguration().registerConfig(properties);

    return controller;

}

2.初始化NamesrvController

3.启动NamesrvController

 public static NamesrvController start(final NamesrvController controller) 

     															throws Exception {

        if (null == controller) {

            throw new IllegalArgumentException("NamesrvController is null");

        }


        boolean initResult = controller.initialize();

        if (!initResult) {

            controller.shutdown();

            System.exit(-3);

        }

        //注册JVM钩子函数代码

     	//在JVM进程关闭之前,先将线程池关闭,及时释放资源
     	//可以借鉴的地方
        Runtime.getRuntime()

            .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {

            @Override

            public Void call() throws Exception {

                //释放资源

                controller.shutdown();

                return null;

            }

        }));

        controller.start();

        return controller;

    }



 public void shutdown() {

     	//关闭nettyServer
        this.remotingServer.shutdown();

       	//关闭线程池
        this.remotingExecutor.shutdown();

     	//关闭定时任务
        this.scheduledExecutorService.shutdown();

		//功能实现当文件内容发生变化时,重新加载文件,可用于读取配置类的文件。
     	//原理:注册一个listener,然后新开个线程,定期去扫描文件
     	//通过对文件内容进行hash来判断文件内容是否发生变化
     	//如果变化了,则回调监听器的onChange方法。
     	//看源码主要是监听证书
     	//关闭fileWatchService
        if (this.fileWatchService != null) {

            this.fileWatchService.shutdown();

        }

    }

大致流程如下图

image.png

上面对源码做了概览,大致知道了NameServerController启动的流程分为3步,创建-初始化-启动。

下面一步一步看吧。

1.创建nameServerController

1.1:解析配置文件,创建NameSrvController

解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController

注意NameServer创建的是是NettyServerConfig,Broker创建的是NettyClientConfig

NamesrvStartup#createNamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {

    //设置MQ版本号

    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));




    //解析启动命令 start mqnamesrv.cmd

    Options options = ServerUtil.buildCommandlineOptions(new Options());

    commandLine = ServerUtil.parseCmdLine

            ("mqnamesrv",

             args, 

             buildCommandlineOptions(options), 

             new PosixParser());



    if (null == commandLine) {

        System.exit(-1);


        return null;

    }

​

    //创建NamesrvConfig

    final NamesrvConfig namesrvConfig = new NamesrvConfig();

    //创建NettyServerConfig

    final NettyServerConfig nettyServerConfig = new NettyServerConfig();

    //设置启动端口号9876

    nettyServerConfig.setListenPort(9876);

    //解析启动-c参数

    if (commandLine.hasOption('c')) {

        //-c指定配置文件

        String file = commandLine.getOptionValue('c');

        if (file != null) {

            //加载配置文件到流

            InputStream in = new BufferedInputStream

                                    (new FileInputStream(file));

            //加载属性到InputStream

            properties = new Properties();

            properties.load(in);

            //分别设置属性到namesrvConfig 和 nettyServerConfig  

            MixAll.properties2Object(properties, namesrvConfig);

            MixAll.properties2Object(properties, nettyServerConfig);

            //设置配置文件存储地址

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);

            in.close();

        }

    }

​


    //-p来指定是否打印配置项,在指定该选项时,直接退出。

    if (commandLine.hasOption('p')) {

        InternalLogger console 

                = InternalLoggerFactory.getLogger

                        (LoggerName.NAMESRV_CONSOLE_NAME);

        //打印namesrvConfig属性

        MixAll.printObjectProperties(console, namesrvConfig);

        //打印nettyServerConfig 属性

        MixAll.printObjectProperties(console, nettyServerConfig);

        System.exit(0);

    }

​

    //将启动参数填充到namesrvConfig,nettyServerConfig

    MixAll.properties2Object

                (ServerUtil.commandLine2Properties(commandLine), 

                                                        namesrvConfig);

​

    //创建NameServerController

    final NamesrvController controller 

            = new NamesrvController(namesrvConfig, nettyServerConfig);



    controller.getConfiguration().registerConfig(properties);

    return controller;

}

2.初始化nameServerController

2.1:初始化NamesrvController

根据启动属性创建NamesrvController实例,并初始化该实例。

NameServerController实例为NameServer核心控制器。

NamesrvController#initialize

public boolean initialize() {

    //加载KV配置
    this.kvConfigManager.load();


    /***
        这里需要看NettyRemotingServer构造方法
        会把netty的启动辅助类serverBootstrap创建好,这个是重点
        保存了channelEventListener。
        新建了netty的boss线程。
        创建publicExecutor线程池。
    ***/
    this.remotingServer = new NettyRemotingServer
                            (this.nettyServerConfig, this.brokerHousekeepingService);

    //创建线程池 默认是8个
    this.remotingExecutor =
        Executors.newFixedThreadPool
        (nettyServerConfig.getServerWorkerThreads(),  
                            new ThreadFactoryImpl("RemotingExecutorThread_"));

    /***
    创建DefaultRequestProcessor 作为netty server 请求处理器。
    org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest 
    处理所有已知request code类型的请求
    **/
    this.registerProcessor();


    //开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
    //如果在2分钟都没有发送心跳 移除不活跃的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    //开启定时任务:每隔10min打印一次KV配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    return true;
}

2.2:启动定时任务:每10秒扫描一次所有broker

Broker30秒向NameServer发送一次心跳。

NamesrvController会开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker。

移除broker是根据broker的lastUpdateStamp+2分钟是否小于当前时间,如果小于就移除。

如果某个broker在2分钟内都没有发送心跳 那么就移除该broker 即连续4次没有发送心跳就移除

RouteInfoManager#scanNotActiveBroker

//扫描不活跃的broker
public void scanNotActiveBroker() {
//2分钟
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;


    //HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    Iterator<Entry<String, BrokerLiveInfo>> it 
                            = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {

        Entry<String, BrokerLiveInfo> next = it.next();
        //BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。
        long last = next.getValue().getLastUpdateTimestamp();
        //BrokerLiveInfo中的 
       //lastUpdateTimestamp+2分钟小于当前时间说明 已经2分钟没有心跳了
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            //关闭并移除channel
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            //销毁channel工作
            this.onChannelDestroy(next.getKey(), 
                                    next.getValue().getChannel());
        }
    }
}
移除2分钟没心跳的broker的路由元信息:RouteInfoManager#onChannelDestroy
    //路由元信息
    //类:RouteInfoManager
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

topicQueueTable: Topic消息队列路由信息,消息发送时根据路由表进行负载均衡

brokerAddrTable: Broker基础信息,包括brokerName、所属集群名称、主备Broker地址

clusterAddrTable: Broker集群信息,存储集群中所有Broker名称

brokerLiveTable: Broker状态信息,NameServer每次收到心跳包是会替换该信息

filterServerTable: Broker上的FilterServer列表,用于类模式消息过滤。

一个Topic拥有多个消息队列,一个Broker为每一个主题创建8个读队列和8个写队列。

多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构。

brokerId为0代表Master,大于0为Slave。

BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。

image.png

image.png

//主要就是移除路由信息表相关信息
public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                //申请写锁,根据brokerAddress
                //从brokerLiveTable和filterServerTable移除
                this.lock.readLock().lockInterruptibly();
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry
                                        = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
​
    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed" + 
                            "clean it's data structure at once");
    }
​
    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
​
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
​


                //维护<String/* brokerName */, BrokerData> brokerAddrTable
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();

                //遍历brokerAddrTable
              while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    //获取brokerData
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();
                    //遍历该broker的所有地址 即主从
                    Iterator<Entry<Long, String>> it
                            = brokerData.getBrokerAddrs().entrySet().iterator();
                    //循环遍历主从
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                         //根据broker地址移除brokerAddr
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            break;
                        }
                    }
                    //如果移除以后没有其他的BrokerAddr 相当于这个broker已经没有实例了
                    //那么把brokerData也从BrokerAddrTable 移除
                   // <String/* brokerName */, BrokerData> brokerAddrTable
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                    }
                }

                /***
                   维护集群信息: key = clusterName value对应的set是 brokerName 
                   <String, Set<String>> clusterAddrTable
                   这里移除的条件是 removeBrokerName=true
                   removeBrokerName 是在移除brokerAddr时 当braokerData中的addrs为空
                   即该broker的主从都不存在 这个broker已经没有实例了
                   设置removeBrokerName=true
               ***/
                if (brokerNameFound != null && removeBrokerName) {
                    Iterator<Entry<String, Set<String>>> 
                            it = this.clusterAddrTable.entrySet().iterator();
                    //遍历clusterAddrTable
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                          //获得集群名称
                        String clusterName = entry.getKey();
                         //获得集群中brokerName集合
                        Set<String> brokerNames = entry.getValue();
                        //从brokerNames中移除brokerNameFound
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            if (brokerNames.isEmpty()) {
                                //如果集群中不包含任何broker,则移除该集群
                                it.remove();
                            }
                            break;
                        }
                    }
                }

                //<String/* topic */, List<QueueData>> topicQueueTable队列
                //这里移除的条件是 removeBrokerName=true
                //removeBrokerName 是在移除brokerAddr时 当brokerData中的addrs为空
                //即该broker的主从都不存在,这个broker已经没有实例了
                //设置removeBrokerName=true
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    //遍历topicQueueTable
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry
                                            = itTopicQueueTable.next();
                        //主题名称
                        String topic = entry.getKey();
                        //队列集合
                        List<QueueData> queueDataList = entry.getValue();
                        //遍历该主题队列
                        Iterator<QueueData> itQueueData
                                        = queueDataList.iterator();

                        while (itQueueData.hasNext()) {
                            //获取queueData
                            QueueData queueData = itQueueData.next();
                            //如果queueData中的brokerName等于本次移除的brokerName
                            //那么从队列中移除该queue
                            if (queueData.getBrokerName()
                                        .equals(brokerNameFound)) {
                                itQueueData.remove();
                            }
                        }
                        //如果该topic的队列为空,则移除该topic
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}

3.注册jvm钩子函数,启动NameServerController

3.1注册jvm钩子函数,启动NameSrvCtr

在JVM进程关闭之前,先将线程池关闭,及时释放资源

NamesrvStartup#start

 public static NamesrvController start(final NamesrvController controller) 

                                                                throws Exception {
​
        if (null == controller) {

            throw new IllegalArgumentException("NamesrvController is null");

        }

​
        boolean initResult = controller.initialize();

        if (!initResult) {

            controller.shutdown();

            System.exit(-3);

        }

        //注册JVM钩子函数代码

        //在JVM进程关闭之前,先将线程池关闭,及时释放资源
        //可以借鉴的地方
        Runtime.getRuntime()

            .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {

            @Override

            public Void call() throws Exception {

                //释放资源

                controller.shutdown();

                return null;

            }

        }));

        controller.start();

        return controller;

    }

​
 public void shutdown() {

        //关闭nettyServer
        this.remotingServer.shutdown();

        //关闭线程池
        this.remotingExecutor.shutdown();

        //关闭定时任务
        this.scheduledExecutorService.shutdown();

        //功能实现当文件内容发生变化时,重新加载文件,可用于读取配置类的文件。
        //原理:注册一个listener,然后新开个线程,定期去扫描文件
        //通过对文件内容进行hash来判断文件内容是否发生变化
        //如果变化了,则回调监听器的onChange方法。
        //看源码主要是监听证书
        //关闭fileWatchService
        if (this.fileWatchService != null) {

            this.fileWatchService.shutdown();

        }

    }

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYeim8og' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片