33.RocketMQ之Broker启动源码

Broker启动流程:BrokerStartup#main

  public static void main(String[] args) {
        //手动指定了 nameServer
        start(createBrokerController(args));
    }
 public static BrokerController start(BrokerController controller) {
        try {
            //启动brokerController
            controller.start();
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

1.创建brokercontroller

 public static BrokerController createBrokerController(String[] args) {
     
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
                                Integer.toString(MQVersion.CURRENT_VERSION));
​
        if (null == System.getProperty
         (NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)){
            NettySystemConfig.socketSndbufSize = 131072;
        }

​
        if (null == System.getProperty
          (NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)){
            NettySystemConfig.socketRcvbufSize = 131072;
        }
​
        try {
            //PackageConflictDetect.detectFastjson();
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil
                            .parseCmdLine("mqbroker", 
                                          args, 
                                          buildCommandlineOptions(options),
                                          new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }
​
            final BrokerConfig brokerConfig = new BrokerConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
​
            nettyClientConfig.setUseTLS
                (Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
            nettyServerConfig.setListenPort(10911);
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
​
            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
            }
            
            //-c 解析配置文件填充配置类
            if (commandLine.hasOption('c')) {       
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    configFile = file;
                    InputStream in = new BufferedInputStream
                                            (new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);
​
                    properties2SystemEnv(properties);
                    MixAll.properties2Object(properties, brokerConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
                    MixAll.properties2Object(properties, nettyClientConfig);
                    MixAll.properties2Object(properties, messageStoreConfig);
​
                    BrokerPathConfigHelper.setBrokerConfigPath(file);
                    in.close();
                }
            }
​
            MixAll.properties2Object
                    (ServerUtil.commandLine2Properties(commandLine), brokerConfig);
            //获取Broker的安装地址
            if (null == brokerConfig.getRocketmqHome()) {
                System.exit(-2);
            }
            //获取配置中的nameServer的地址
            String namesrvAddr = brokerConfig.getNamesrvAddr();
            if (null != namesrvAddr) {
                try {
                    String[] addrArray = namesrvAddr.split(";");
                    for (String addr : addrArray) {
                        RemotingUtil.string2SocketAddress(addr);
                    }
                } catch (Exception e) {
                    System.out.printf(
                    System.exit(-3);
                }
            }
            //判断broker的主从角色
            switch (messageStoreConfig.getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    //MixAll.MASTER_ID = 0 代表是master
                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
                    break;
                case SLAVE:
                    //SLAVEID不能小于等于0
                    if (brokerConfig.getBrokerId() <= 0) {
                        System.out.printf("Slave's brokerId must be > 0");
                        System.exit(-3);
                    }
                    break;
                default:
                    break;
            }
​
            messageStoreConfig.setHaListenPort
                    (nettyServerConfig.getListenPort() + 1);
            LoggerContext lc = (LoggerContext) 
                            LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure
                (brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
​
            if (commandLine.hasOption('p')) {
                InternalLogger console = InternalLoggerFactory
                    .getLogger(LoggerName.BROKER_CONSOLE_NAME);
                MixAll.printObjectProperties(console, brokerConfig);
                MixAll.printObjectProperties(console, nettyServerConfig);
                MixAll.printObjectProperties(console, nettyClientConfig);
                MixAll.printObjectProperties(console, messageStoreConfig);
                System.exit(0);
            } else if (commandLine.hasOption('m')) {
                InternalLogger console = InternalLoggerFactory
                    .getLogger(LoggerName.BROKER_CONSOLE_NAME);
                MixAll.printObjectProperties(console, brokerConfig, true);
                MixAll.printObjectProperties(console, nettyServerConfig, true);
                MixAll.printObjectProperties(console, nettyClientConfig, true);
                MixAll.printObjectProperties(console, messageStoreConfig, true);
                System.exit(0);
            }
​
            log = InternalLoggerFactory
                .getLogger(LoggerName.BROKER_LOGGER_NAME);
            MixAll.printObjectProperties(log, brokerConfig);
            MixAll.printObjectProperties(log, nettyServerConfig);
            MixAll.printObjectProperties(log, nettyClientConfig);
            MixAll.printObjectProperties(log, messageStoreConfig);
​
            final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            
            controller.getConfiguration().registerConfig(properties);
            //初始化brokerController
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
​
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);
​
                @Override
                public void run() {
                    synchronized (this) {
                        log.info("Shutdown hook was invoked, {}");
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long beginTime = System.currentTimeMillis();
                            controller.shutdown();
                            long consumingTimeTotal = 
                                System.currentTimeMillis() - beginTime;
                            log.info
                                ("Shutdown hook over, consuming total time(ms): {}");
                        }
                    }
                }
            }, "ShutdownHook"));
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }
public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig) {
    
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
        this.topicConfigManager = new TopicConfigManager(this);
        //处理拉取消息请求的线程
        this.pullMessageProcessor = new PullMessageProcessor(this);
        //挂住拉取消息请求的线程
        this.pullRequestHoldService = new PullRequestHoldService(this);
        //监听器 有消息来了就可以通过 pullRequestHoldService 返回响应结果
        this.messageArrivingListener = new 
        		NotifyMessageArrivingListener(this.pullRequestHoldService);
        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
        this.consumerFilterManager = new ConsumerFilterManager(this);
        this.producerManager = new ProducerManager();
        this.clientHousekeepingService = new ClientHousekeepingService(this);
        this.broker2Client = new Broker2Client(this);
        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        this.filterServerManager = new FilterServerManager(this);

        this.slaveSynchronize = new SlaveSynchronize(this);

        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
        //心跳
        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
        this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());

        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));

        this.brokerFastFailure = new BrokerFastFailure(this);
        this.configuration = new Configuration(
            log,
            BrokerPathConfigHelper.getBrokerConfigPath(),
            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
        );
    }

创建配置类

初始化配置主要任务是根据 properties 文件以及命令行参数值,创建了以下配置类:

•nettyServerConfig:封装了作为消息队列服务器的配置信息

•nettyClientConfig:封装了作为NameServer客户端配置信息

•brokerConfig:封装了 Broker 配置信息

•messageStoreConfig:封装了 RocketMQ 存储系统的配置信息

2.初始化Controller

public boolean initialize() throws CloneNotSupportedException {
        boolean result = this.topicConfigManager.load();
    	/***
    	主题配置加载
    	这一步主要是加载 topics.json 文件
        并解析生成 TopicConfigSerializerWrapper 对象
        并 set 进 topicConfigTable 中。
        ***/
        result = result && this.consumerOffsetManager.load();
    	/***
    	消费者订阅组加载:
    	这一步主要是加载 consumerOffset.json 文件
    	并解析生成 ConsumerOffsetManager 对象,并替换 offsetTable 成员值。
    	***/
        result = result && this.subscriptionGroupManager.load();
    	/***
    	消费者过滤管理加载:
    	这一步主要是加载 consumerFilter.json 文件,并解析生成 ConsumerFilterManager 对象
    	***/
        result = result && this.consumerFilterManager.load();

        if (result) {
            try {
                
                /***
                messageStore 消息存储初始化:
                这一步主要是创建了 DefaultMessageStore 对象
                这是 Broker 消息寸处的核心实现
                创建该对象时也会启动很多相关服务线程,用于管理 store 的存储。
                ***/
                this.messageStore =new DefaultMessageStore
                    					    (this.messageStoreConfig, 		
                                             this.brokerStatsManager,
                                             this.messageArrivingListener,
                                             this.brokerConfig);
                
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    
                    DLedgerRoleChangeHandler roleChangeHandler = 
                        	new DLedgerRoleChangeHandler
                        		(this, (DefaultMessageStore) messageStore);
                    
                    ((DLedgerCommitLog)((DefaultMessageStore)messageStore)
                     								.getCommitLog())
                                                    .getdLedgerServer()
                                                    .getdLedgerLeaderElector()
                                      .addRoleChangeHandler(roleChangeHandler);
                                                        
                }
                this.brokerStats = 
                    new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = 
                    	new MessageStorePluginContext
                    			(messageStoreConfig, brokerStatsManager, 										messageArrivingListener, brokerConfig);
                
                this.messageStore = MessageStoreFactory.build
                    					(context, this.messageStore);
                
                this.messageStore.getDispatcherList().addFirst
                    (new CommitLogDispatcherCalcBitMap
                     		(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }
		/***
		messageStore加载
		1.延迟消息加载:加载 delayOffset.json 文件
		解析生成DelayOffsetSerializerWrapper,并加入offsetTable中

		2.commitLog加载
		MappfileQueue映射文件队列加载,加载定义的storePath目录文件

		3.consumeQueue加载
		***/
        result = result && this.messageStore.load();

        if (result) {
            /***
            创建nettyRemotingServer:根据前面初始化好的nettyConfig创建远程通讯服务
			根据brokerConfig初始化各种线程池:
            1.初始化发送消息线程池
			2.初始化拉取消息线程池
            3.初始化broker管理线程池
            4.初始化client管理线程池
            5.初始化消费者管理线程池
			把这些线程池注册到nettyRemotingServer中
            ***/
            this.remotingServer = new 
                		NettyRemotingServer(this.nettyServerConfig, 
                                          this.clientHousekeepingService);
            
            NettyServerConfig fastConfig = (NettyServerConfig) 	
                						this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer
                						(fastConfig, this.clientHousekeepingService);
            
            //发消息线程池
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
              //Math.min(Runtime.getRuntime().availableProcessors(), 4);
              //默认是核心数和4 取最小的那个
              this.brokerConfig.getSendMessageThreadPoolNums(),
              this.brokerConfig.getSendMessageThreadPoolNums(),
                                            1000 * 60,
                                            TimeUnit.MILLISECONDS,
                                            this.sendThreadPoolQueue,
                           new ThreadFactoryImpl("SendMessageThread_"));
            
			 //拉消息线程池
            this.pullMessageExecutor = 
                //16 + Runtime.getRuntime().availableProcessors() * 2;
                new BrokerFixedThreadPoolExecutor(                   		
                this.brokerConfig.getPullMessageThreadPoolNums(),           	
                this.brokerConfig.getPullMessageThreadPoolNums(),
                                            1000 * 60,
                                            TimeUnit.MILLISECONDS,
                                            this.pullThreadPoolQueue,
                             new ThreadFactoryImpl("PullMessageThread_"));
			 //查询消息线程池
            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                   this.brokerConfig.getQueryMessageThreadPoolNums(),
                   this.brokerConfig.getQueryMessageThreadPoolNums(),
                                            1000 * 60,
                                            TimeUnit.MILLISECONDS,
                                            this.queryThreadPoolQueue,
                              new ThreadFactoryImpl("QueryMessageThread_"));
			//broker线程池
            this.adminBrokerExecutor =
                Executors.newFixedThreadPool
                		(this.brokerConfig.getAdminBrokerThreadPoolNums(), 
                                new ThreadFactoryImpl("AdminBrokerThread_"));
			//管理客户端
            this.clientManageExecutor = new ThreadPoolExecutor(                                 this.brokerConfig.getClientManageThreadPoolNums(),                               this.brokerConfig.getClientManageThreadPoolNums(),
                                            1000 * 60,
                                            TimeUnit.MILLISECONDS,
                                            this.clientManagerThreadPoolQueue,
                                 new ThreadFactoryImpl("ClientManageThread_"));
			
            //心跳
            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
				 this.brokerConfig.getHeartbeatThreadPoolNums(),
                 this.brokerConfig.getHeartbeatThreadPoolNums(),
                                            1000 * 60,
                                            TimeUnit.MILLISECONDS,
                                            this.heartbeatThreadPoolQueue,
                             new ThreadFactoryImpl("HeartbeatThread_", true));
			
            //事务消息线程池
            this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
  						 this.brokerConfig.getEndTransactionThreadPoolNums(),
                         this.brokerConfig.getEndTransactionThreadPoolNums(),
                                            1000 * 60,
                                            TimeUnit.MILLISECONDS,
                                            this.endTransactionThreadPoolQueue,
                               new ThreadFactoryImpl("EndTransactionThread_"));

            this.consumerManageExecutor = Executors.newFixedThreadPool
                		(this.brokerConfig.getConsumerManageThreadPoolNums(), 
                              	new ThreadFactoryImpl("ConsumerManageThread_"));
			
            this.registerProcessor();

            final long initialDelay = UtilAll.computNextMorningTimeMillis()
                								- System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            //开启定时记录 Broker 的状态(消息拉取时间总和、消息发送总和等)
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);
			
             //每5s持久化一次消息消费进度到consumerOffse.json文件 
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(),TimeUnit.MILLISECONDS);
            
			//消息过滤持久化,定时向 consumerFilter.json 文件写入消费者过滤器信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerFilterManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumer filter error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
			
            //定时禁用消费慢的消费者以保护 Broker
            //可以设置 disableConsumeIfConsumerReadSlowly 属性,默认 false
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Throwable e) {
                        log.error("protectBroker error.", e);
                    }
                }
            }, 3, 3, TimeUnit.MINUTES);
			
            //定时打印 Send、Pull、Query、Transaction 信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Throwable e) {
                        log.error("printWaterMark error.", e);
                    }
                }
            }, 10, 1, TimeUnit.SECONDS);
			
            //定时打印已存储在提交日志中但尚未调度到消费队列的字节数
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        log.info("dispatch behind commit log {} bytes");
                    } catch (Throwable e) {
                        log.error("schedule dispatchBehindBytes error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
				
            //定时获取 namserver 地址
            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList
                    			(this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }

            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                //如果是从服务器:
                //定时从主服务器获取 TopicConfig、ConsumerOffset、DelayOffset、
                //SubscriptionGroupConfig 等信息
                if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    if (this.messageStoreConfig.getHaMasterAddress() != null && 
                       this.messageStoreConfig.getHaMasterAddress().length() >= 6){
                        this.messageStore.updateHaMasterAddress
                            	(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true;
                    }
                //如果是主服务器:定时打印从服务器落后的字节数
                } else {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                }
            }

            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                try {
                    fileWatchService = new FileWatchService(
                        new String[] {
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                        },
                        new FileWatchService.Listener() {
                            boolean certChanged, keyChanged = false;

                            @Override
                            public void onChanged(String path) {
                               if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                    log.info("The trust certificate changed," +
                                             					"reload the sslcontext"); 
                                    reloadServerSslContext();
                                }
                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                    certChanged = true;
                                }
                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                    keyChanged = true;
                                }
                                if (certChanged && keyChanged) {
                                    log.info("The certificate and private key changed," +
                                             			"reload the ssl context");
                                    certChanged = keyChanged = false;
                                    reloadServerSslContext();
                                }
                            }

                            private void reloadServerSslContext() {
                                ((NettyRemotingServer) remotingServer).loadSslContext();
                                ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                            }
                        });
                } catch (Exception e) {
                    log.warn("FileWatchService created error, " +
                             				"can't load the certificate dynamically");
                }
            }
            initialTransaction();
            initialAcl();
            initialRpcHooks();
        }
        return result;
    }

创建加载消息存储服务

messageStore

创建NettyRemotingServer

初始化定时任务

在线程池注册完后,就会开启各种定时任务

3.启动brokercontroller

  public void start() throws Exception {
      /***
      messageStore启动会启动启动各类线程服务:
         1)启动刷盘任务线程	
         2)启动commitLog线程	
         3)启动存储存储统计服务线程storeStateService	
         4)启动延迟定时消息服务线程	
         5)启动消息分发到各种Consumer queue服务线程reputMessageService	
         6)启动HA主从同步线程
      ***/
        if (this.messageStore != null) {
            this.messageStore.start();
        }
		/***
		启动netty服务:
		remotingServer启动:启动远程通讯服务 
		***/
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
		/***
		启动netty服务:
		fastRemotingServer启动:启动远程通讯服务 
		***/
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }
		
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
		/***
		启动netty服务:
		broker对外API启动:启动client远程通讯服务
		***/
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
		/***
		pullRequestHolderService使拉取消息保持长轮询任务启动
		***/
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
		/***
		ClientHouseKeepingService线程定时清除不活动链接任务启动
		***/
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }
		/***
		过滤服务器任务启动
		***/
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        }
      	/***
		注册Broker信息到所有的NameServer 这里的all指的是NameServer
		***/
        this.registerBrokerAll(true, false, true);
		/***
		每隔30s上报Broker信息到NameServer
		***/
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    //进入registerBrokerAll
                    BrokerController.this.registerBrokerAll
                        				(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }            
            /***
            第一次延迟10秒启动 因为在前面已经调用了一次registerBrokerAll
        	private int registerNameServerPeriod = 1000 * 30;
        	先和10秒取大,再和60秒取小 发现最后得到的结果就是30
        	这个虽然可以配置 但是rocket已经限制了这个间隔的最大值和最小值 在 10 - 60秒之间
        	***/
        }, 1000 * 10, 
           Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
           TimeUnit.MILLISECONDS);

        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }

        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }
    }

启动brokercontroller

1.路由注册

Broker发送心跳包

image.png

RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳信息,Broker每隔30s向集群中所有NameServer发送心跳包也就是4次,NameServer收到心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdataTimeStamp信息,然后NameServer每隔10s扫描brokerLiveTable,如果连续120S没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。

代码:BrokerController#start

public void start() throws Exception {
    //忽略一大堆校验
    //注册Broker信息
    this.registerBrokerAll(true, false, true);
    //每隔30s上报Broker信息到NameServer
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                //进入registerBrokerAll
                //false 发送的不是单向消息
                BrokerController.this
                    .registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }

        //第一次延迟10秒
        //private int registerNameServerPeriod = 1000 * 30;
        //30秒是根据和10秒取大,60秒取小比较获得到的
        //仔细想的话 rocket已经限制了这个间隔的最大值和最小值 是不是?
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}

BrokerController#registerBrokerAll

这里的all指的是所有的nameServer,也就是需要往所有的NS上报自己的信息。

/***
topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡


brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址

clusterAddrTable:Broker集群信息,存储集群中所有Broker名称

brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息



filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。

***/

//false 发送的不是单向消息
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        TopicConfigSerializeWrapper topicConfigWrapper = 
            				this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            			|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable 
                			= new ConcurrentHashMap<String, TopicConfig>();
            for (TopicConfig topicConfig : 	
                 			topicConfigWrapper.getTopicConfigTable().values()) {
                TopicConfig tmp 
                    =new TopicConfig(topicConfig.getTopicName(), 
                                     topicConfig.getReadQueueNums(),
                                     topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
                
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }

        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            //doRegisterBrokerAll
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

BrokerOuterAPI#doRegisterBrokerAll

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
//获得nameServer地址信息 nameServer地址信息是从broker的config文件加载得到的
//this.brokerOuterAPI.updateNameServerAddressList
//   				 (this.brokerConfig.getNamesrvAddr());
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
//遍历所有nameserver列表
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {



    //封装请求头
    final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    requestHeader.setHaServerAddr(haServerAddr);
    requestHeader.setCompressed(compressed);
	//封装请求体
    RegisterBrokerBody requestBody = new RegisterBrokerBody();
    requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
    requestBody.setFilterServerList(filterServerList);
    final byte[] body = requestBody.encode(compressed);
    final int bodyCrc32 = UtilAll.crc32(body);
    requestHeader.setBodyCrc32(bodyCrc32);
    
    final CountDownLatch countDownLatch = new 
        				CountDownLatch(nameServerAddressList.size());
    
    //for循环遍历所有的nameServer地址
    for (final String namesrvAddr : nameServerAddressList) {
        brokerOuterExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //向NameServer注册
                    RegisterBrokerResult result 
                        = registerBroker
                        	(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                    if (result != null) {
                        registerBrokerResultList.add(result);
                    }
                    log.info("register broker to nameServer {} OK");
                } catch (Exception e) {
                    log.warn("registerBroker Exception");
                } finally {
                    countDownLatch.countDown();
                }
            }
        });
    }


    try {
        countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
    }
}

代码:BrokerOutAPI#registerBroker

if (oneway) {//oneway=false
    try {
         //oneway值为false,表示单向通信,Broker不关心NameServer的返回,也不会触发任何回调函数。
         // true 表示双向通信,Broker关心NameServer的返回,会触发回调函数。
        this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
    } catch (RemotingTooMuchRequestException e) {
        // Ignore
    }
    return null;
}
//实际是走的这里
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

2.NameServer处理心跳包,更新路由信息

image.png

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker

DefaultRequestProcessor#processRequest

//判断是注册Broker信息
case RequestCode.REGISTER_BROKER:
	Version brokerVersion = MQVersion.value2Version(request.getVersion());
	if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
	    return this.registerBrokerWithFilterServer(ctx, request);
	} else {
        //注册Broker信息
	    return this.registerBroker(ctx, request);
	}

DefaultRequestProcessor#registerBroker

RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
    requestHeader.getClusterName(),
    requestHeader.getBrokerAddr(),
    requestHeader.getBrokerName(),
    requestHeader.getBrokerId(),
    requestHeader.getHaServerAddr(),
    topicConfigWrapper,
    null,
    ctx.channel()
);

代码:RouteInfoManager#registerBroker

维护路由信息

//加锁
this.lock.writeLock().lockInterruptibly();

//维护clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
    brokerNames = new HashSet<String>();
    this.clusterAddrTable.put(clusterName, brokerNames);
}


brokerNames.add(brokerName);
//维护brokerAddrTable

BrokerData brokerData = this.brokerAddrTable.get(brokerName);
//第一次注册,则创建brokerData
if (null == brokerData) {
    registerFirst = true;
    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
    this.brokerAddrTable.put(brokerName, brokerData);
}


//非第一次注册,更新Broker
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
    Entry<Long, String> item = it.next();
    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
        it.remove();
    }
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
//维护topicQueueTable
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || 
        registerFirst) {
        ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
        if (tcTable != null) {
            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                this.createAndUpdateQueueData(brokerName, entry.getValue());
            }
        }
    }
}

代码:RouteInfoManager#createAndUpdateQueueData

image.png

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    //创建QueueData QueueData就是topicQueueTable对应的List中的元素
	QueueData queueData = new QueueData();
	queueData.setBrokerName(brokerName);
	queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
	queueData.setReadQueueNums(topicConfig.getReadQueueNums());
	queueData.setPerm(topicConfig.getPerm());
	queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
	//获得topicQueueTable中队列集合
	List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
    //topicQueueTable为空,则直接添加queueData到队列集合
	if (null == queueDataList) {
	    queueDataList = new LinkedList<QueueData>();
	    queueDataList.add(queueData);
	    this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
	    log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
	} else {
        //判断是否是新的队列
	    boolean addNewOne = true;
	    Iterator<QueueData> it = queueDataList.iterator();
	    while (it.hasNext()) {
	        QueueData qd = it.next();
            //如果brokerName相同,代表不是新的队列
	        if (qd.getBrokerName().equals(brokerName)) {
	            if (qd.equals(queueData)) {
	                addNewOne = false;
	        } else {
	                    log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
	                        queueData);
	                    it.remove();
	                }
	            }
	        }
		//如果是新的队列,则添加队列到queueDataList
        if (addNewOne) {
            queueDataList.add(queueData);
        }
    }
}
//维护brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(
    System.currentTimeMillis(),
    topicConfigWrapper.getDataVersion(),
    channel,
    haServerAddr));
//维护filterServerList
if (filterServerList != null) {
    if (filterServerList.isEmpty()) {
        this.filterServerTable.remove(brokerAddr);
    } else {
        this.filterServerTable.put(brokerAddr, filterServerList);
    }

}





if (MixAll.MASTER_ID != brokerId) {
    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
    if (masterAddr != null) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
        if (brokerLiveInfo != null) {
            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
            result.setMasterAddr(masterAddr);
        }

    }
}

3.NameServer 路由删除

Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerId,Broker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

RocketMQ有两个触发点来删除路由信息

NameServer定期扫描

  • NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。

Broker正常关闭

  • Broker在正常关闭的情况下,会执行unregisterBroker指令

这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。

image.png

代码:NamesrvController#initialize

//每隔10s扫描一次为活跃Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {


    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }

}, 5, 10, TimeUnit.SECONDS);

代码:RouteInfoManager#scanNotActiveBroker

public void scanNotActiveBroker() {
    //获得brokerLiveTable
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    //遍历brokerLiveTable
    while (it.hasNext()) {

        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        //如果收到心跳包的时间距当时时间是否超过120s
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            //关闭连接
            RemotingUtil.closeChannel(next.getValue().getChannel());
            //移除broker
            it.remove();
            //维护路由表
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

代码:RouteInfoManager#onChannelDestroy

//申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除
this.lock.writeLock().lockInterruptibly();

this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
//维护brokerAddrTable

String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
//遍历brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
    BrokerData brokerData = itBrokerAddrTable.next().getValue();
    //遍历broker地址
    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
    while (it.hasNext()) {
        Entry<Long, String> entry = it.next();
        Long brokerId = entry.getKey();
        String brokerAddr = entry.getValue();
        //根据broker地址移除brokerAddr
        if (brokerAddr.equals(brokerAddrFound)) {
            brokerNameFound = brokerData.getBrokerName();
            it.remove();
            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                brokerId, brokerAddr);
            break;
        }
    }
	//如果当前主题只包含待移除的broker,则移除该topic
    if (brokerData.getBrokerAddrs().isEmpty()) {
        removeBrokerName = true;
        itBrokerAddrTable.remove();
        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
            brokerData.getBrokerName());
    }
}

//维护clusterAddrTable
if (brokerNameFound != null && removeBrokerName) {
    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
    //遍历clusterAddrTable
    while (it.hasNext()) {

        Entry<String, Set<String>> entry = it.next();
        //获得集群名称
        String clusterName = entry.getKey();
        //获得集群中brokerName集合
        Set<String> brokerNames = entry.getValue();
        //从brokerNames中移除brokerNameFound
        boolean removed = brokerNames.remove(brokerNameFound);
        if (removed) {
            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                brokerNameFound, clusterName);

            if (brokerNames.isEmpty()) {
                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                    clusterName);
                //如果集群中不包含任何broker,则移除该集群
                it.remove();
            }

            break;
        }
    }
}
//维护topicQueueTable队列
if (removeBrokerName) {
    //遍历topicQueueTable
    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
        this.topicQueueTable.entrySet().iterator();
    while (itTopicQueueTable.hasNext()) {
        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
        //主题名称
        String topic = entry.getKey();
        //队列集合
        List<QueueData> queueDataList = entry.getValue();
        //遍历该主题队列
        Iterator<QueueData> itQueueData = queueDataList.iterator();
        while (itQueueData.hasNext()) {
            //从队列中移除为活跃broker信息
            QueueData queueData = itQueueData.next();
            if (queueData.getBrokerName().equals(brokerNameFound)) {
                itQueueData.remove();
                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                    topic, queueData);
            }
        }
        //如果该topic的队列为空,则移除该topic
        if (queueDataList.isEmpty()) {
            itTopicQueueTable.remove();
            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                topic);
        }
    }
}

//释放写锁
finally {
    this.lock.writeLock().unlock();
}

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYHSqIqu' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片