Broker启动流程:BrokerStartup#main
public static void main(String[] args) {
//手动指定了 nameServer
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
//启动brokerController
controller.start();
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
1.创建brokercontroller
public static BrokerController createBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,
Integer.toString(MQVersion.CURRENT_VERSION));
if (null == System.getProperty
(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)){
NettySystemConfig.socketSndbufSize = 131072;
}
if (null == System.getProperty
(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)){
NettySystemConfig.socketRcvbufSize = 131072;
}
try {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil
.parseCmdLine("mqbroker",
args,
buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS
(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
//-c 解析配置文件填充配置类
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream
(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
MixAll.properties2Object
(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
//获取Broker的安装地址
if (null == brokerConfig.getRocketmqHome()) {
System.exit(-2);
}
//获取配置中的nameServer的地址
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
System.exit(-3);
}
}
//判断broker的主从角色
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
//MixAll.MASTER_ID = 0 代表是master
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
//SLAVEID不能小于等于0
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
messageStoreConfig.setHaListenPort
(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext)
LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure
(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory
.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
InternalLogger console = InternalLoggerFactory
.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
log = InternalLoggerFactory
.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
controller.getConfiguration().registerConfig(properties);
//初始化brokerController
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}");
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal =
System.currentTimeMillis() - beginTime;
log.info
("Shutdown hook over, consuming total time(ms): {}");
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
//处理拉取消息请求的线程
this.pullMessageProcessor = new PullMessageProcessor(this);
//挂住拉取消息请求的线程
this.pullRequestHoldService = new PullRequestHoldService(this);
//监听器 有消息来了就可以通过 pullRequestHoldService 返回响应结果
this.messageArrivingListener = new
NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.filterServerManager = new FilterServerManager(this);
this.slaveSynchronize = new SlaveSynchronize(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
//心跳
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
创建配置类
初始化配置主要任务是根据 properties 文件以及命令行参数值,创建了以下配置类:
•nettyServerConfig:封装了作为消息队列服务器的配置信息
•nettyClientConfig:封装了作为NameServer客户端配置信息
•brokerConfig:封装了 Broker 配置信息
•messageStoreConfig:封装了 RocketMQ 存储系统的配置信息
2.初始化Controller
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
/***
主题配置加载
这一步主要是加载 topics.json 文件
并解析生成 TopicConfigSerializerWrapper 对象
并 set 进 topicConfigTable 中。
***/
result = result && this.consumerOffsetManager.load();
/***
消费者订阅组加载:
这一步主要是加载 consumerOffset.json 文件
并解析生成 ConsumerOffsetManager 对象,并替换 offsetTable 成员值。
***/
result = result && this.subscriptionGroupManager.load();
/***
消费者过滤管理加载:
这一步主要是加载 consumerFilter.json 文件,并解析生成 ConsumerFilterManager 对象
***/
result = result && this.consumerFilterManager.load();
if (result) {
try {
/***
messageStore 消息存储初始化:
这一步主要是创建了 DefaultMessageStore 对象
这是 Broker 消息寸处的核心实现
创建该对象时也会启动很多相关服务线程,用于管理 store 的存储。
***/
this.messageStore =new DefaultMessageStore
(this.messageStoreConfig,
this.brokerStatsManager,
this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler =
new DLedgerRoleChangeHandler
(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore)messageStore)
.getCommitLog())
.getdLedgerServer()
.getdLedgerLeaderElector()
.addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats =
new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context =
new MessageStorePluginContext
(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build
(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst
(new CommitLogDispatcherCalcBitMap
(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
/***
messageStore加载
1.延迟消息加载:加载 delayOffset.json 文件
解析生成DelayOffsetSerializerWrapper,并加入offsetTable中
2.commitLog加载
MappfileQueue映射文件队列加载,加载定义的storePath目录文件
3.consumeQueue加载
***/
result = result && this.messageStore.load();
if (result) {
/***
创建nettyRemotingServer:根据前面初始化好的nettyConfig创建远程通讯服务
根据brokerConfig初始化各种线程池:
1.初始化发送消息线程池
2.初始化拉取消息线程池
3.初始化broker管理线程池
4.初始化client管理线程池
5.初始化消费者管理线程池
把这些线程池注册到nettyRemotingServer中
***/
this.remotingServer = new
NettyRemotingServer(this.nettyServerConfig,
this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig)
this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer
(fastConfig, this.clientHousekeepingService);
//发消息线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
//Math.min(Runtime.getRuntime().availableProcessors(), 4);
//默认是核心数和4 取最小的那个
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//拉消息线程池
this.pullMessageExecutor =
//16 + Runtime.getRuntime().availableProcessors() * 2;
new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
//查询消息线程池
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
//broker线程池
this.adminBrokerExecutor =
Executors.newFixedThreadPool
(this.brokerConfig.getAdminBrokerThreadPoolNums(),
new ThreadFactoryImpl("AdminBrokerThread_"));
//管理客户端
this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
//心跳
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
//事务消息线程池
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor = Executors.newFixedThreadPool
(this.brokerConfig.getConsumerManageThreadPoolNums(),
new ThreadFactoryImpl("ConsumerManageThread_"));
this.registerProcessor();
final long initialDelay = UtilAll.computNextMorningTimeMillis()
- System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
//开启定时记录 Broker 的状态(消息拉取时间总和、消息发送总和等)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
//每5s持久化一次消息消费进度到consumerOffse.json文件
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(),TimeUnit.MILLISECONDS);
//消息过滤持久化,定时向 consumerFilter.json 文件写入消费者过滤器信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
//定时禁用消费慢的消费者以保护 Broker
//可以设置 disableConsumeIfConsumerReadSlowly 属性,默认 false
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
//定时打印 Send、Pull、Query、Transaction 信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
//定时打印已存储在提交日志中但尚未调度到消费队列的字节数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes");
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
//定时获取 namserver 地址
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList
(this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//如果是从服务器:
//定时从主服务器获取 TopicConfig、ConsumerOffset、DelayOffset、
//SubscriptionGroupConfig 等信息
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null &&
this.messageStoreConfig.getHaMasterAddress().length() >= 6){
this.messageStore.updateHaMasterAddress
(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
//如果是主服务器:定时打印从服务器落后的字节数
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed," +
"reload the sslcontext");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed," +
"reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, " +
"can't load the certificate dynamically");
}
}
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}
创建加载消息存储服务
messageStore
创建NettyRemotingServer
初始化定时任务
在线程池注册完后,就会开启各种定时任务
3.启动brokercontroller
public void start() throws Exception {
/***
messageStore启动会启动启动各类线程服务:
1)启动刷盘任务线程
2)启动commitLog线程
3)启动存储存储统计服务线程storeStateService
4)启动延迟定时消息服务线程
5)启动消息分发到各种Consumer queue服务线程reputMessageService
6)启动HA主从同步线程
***/
if (this.messageStore != null) {
this.messageStore.start();
}
/***
启动netty服务:
remotingServer启动:启动远程通讯服务
***/
if (this.remotingServer != null) {
this.remotingServer.start();
}
/***
启动netty服务:
fastRemotingServer启动:启动远程通讯服务
***/
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
/***
启动netty服务:
broker对外API启动:启动client远程通讯服务
***/
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
/***
pullRequestHolderService使拉取消息保持长轮询任务启动
***/
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
/***
ClientHouseKeepingService线程定时清除不活动链接任务启动
***/
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
/***
过滤服务器任务启动
***/
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}
/***
注册Broker信息到所有的NameServer 这里的all指的是NameServer
***/
this.registerBrokerAll(true, false, true);
/***
每隔30s上报Broker信息到NameServer
***/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//进入registerBrokerAll
BrokerController.this.registerBrokerAll
(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
/***
第一次延迟10秒启动 因为在前面已经调用了一次registerBrokerAll
private int registerNameServerPeriod = 1000 * 30;
先和10秒取大,再和60秒取小 发现最后得到的结果就是30
这个虽然可以配置 但是rocket已经限制了这个间隔的最大值和最小值 在 10 - 60秒之间
***/
}, 1000 * 10,
Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),
TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
启动brokercontroller
1.路由注册
Broker发送心跳包
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳信息,Broker每隔30s向集群中所有NameServer发送心跳包也就是4次,NameServer收到心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdataTimeStamp信息,然后NameServer每隔10s扫描brokerLiveTable,如果连续120S没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。
代码:BrokerController#start
public void start() throws Exception {
//忽略一大堆校验
//注册Broker信息
this.registerBrokerAll(true, false, true);
//每隔30s上报Broker信息到NameServer
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//进入registerBrokerAll
//false 发送的不是单向消息
BrokerController.this
.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
//第一次延迟10秒
//private int registerNameServerPeriod = 1000 * 30;
//30秒是根据和10秒取大,60秒取小比较获得到的
//仔细想的话 rocket已经限制了这个间隔的最大值和最小值 是不是?
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}
BrokerController#registerBrokerAll
这里的all指的是所有的nameServer,也就是需要往所有的NS上报自己的信息。
/***
topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
clusterAddrTable:Broker集群信息,存储集群中所有Broker名称
brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息
filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。
***/
//false 发送的不是单向消息
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper =
this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable
= new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig :
topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp
=new TopicConfig(topicConfig.getTopicName(),
topicConfig.getReadQueueNums(),
topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
//doRegisterBrokerAll
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
BrokerOuterAPI#doRegisterBrokerAll
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
//获得nameServer地址信息 nameServer地址信息是从broker的config文件加载得到的
//this.brokerOuterAPI.updateNameServerAddressList
// (this.brokerConfig.getNamesrvAddr());
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
//遍历所有nameserver列表
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//封装请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
//封装请求体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new
CountDownLatch(nameServerAddressList.size());
//for循环遍历所有的nameServer地址
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//向NameServer注册
RegisterBrokerResult result
= registerBroker
(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker to nameServer {} OK");
} catch (Exception e) {
log.warn("registerBroker Exception");
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
代码:BrokerOutAPI#registerBroker
if (oneway) {//oneway=false
try {
//oneway值为false,表示单向通信,Broker不关心NameServer的返回,也不会触发任何回调函数。
// true 表示双向通信,Broker关心NameServer的返回,会触发回调函数。
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//实际是走的这里
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
2.NameServer处理心跳包,更新路由信息
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker
DefaultRequestProcessor#processRequest
//判断是注册Broker信息
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
//注册Broker信息
return this.registerBroker(ctx, request);
}
DefaultRequestProcessor#registerBroker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
代码:RouteInfoManager#registerBroker
维护路由信息
//加锁
this.lock.writeLock().lockInterruptibly();
//维护clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
//维护brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
//第一次注册,则创建brokerData
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
//非第一次注册,更新Broker
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
//维护topicQueueTable
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) ||
registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
代码:RouteInfoManager#createAndUpdateQueueData
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
//创建QueueData QueueData就是topicQueueTable对应的List中的元素
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
//获得topicQueueTable中队列集合
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
//topicQueueTable为空,则直接添加queueData到队列集合
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
//判断是否是新的队列
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
//如果brokerName相同,代表不是新的队列
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
//如果是新的队列,则添加队列到queueDataList
if (addNewOne) {
queueDataList.add(queueData);
}
}
}
//维护brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
//维护filterServerList
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
3.NameServer 路由删除
Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerId,Broker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocketMQ有两个触发点来删除路由信息
NameServer定期扫描
- NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
Broker正常关闭
- Broker在正常关闭的情况下,会执行unregisterBroker指令
这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。
代码:NamesrvController#initialize
//每隔10s扫描一次为活跃Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
代码:RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() {
//获得brokerLiveTable
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
//遍历brokerLiveTable
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
//如果收到心跳包的时间距当时时间是否超过120s
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
//关闭连接
RemotingUtil.closeChannel(next.getValue().getChannel());
//移除broker
it.remove();
//维护路由表
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
代码:RouteInfoManager#onChannelDestroy
//申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除
this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
//维护brokerAddrTable
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
//遍历brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
//遍历broker地址
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
//根据broker地址移除brokerAddr
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
//如果当前主题只包含待移除的broker,则移除该topic
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
//维护clusterAddrTable
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
//遍历clusterAddrTable
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
//获得集群名称
String clusterName = entry.getKey();
//获得集群中brokerName集合
Set<String> brokerNames = entry.getValue();
//从brokerNames中移除brokerNameFound
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
//如果集群中不包含任何broker,则移除该集群
it.remove();
}
break;
}
}
}
//维护topicQueueTable队列
if (removeBrokerName) {
//遍历topicQueueTable
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
//主题名称
String topic = entry.getKey();
//队列集合
List<QueueData> queueDataList = entry.getValue();
//遍历该主题队列
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
//从队列中移除为活跃broker信息
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
//如果该topic的队列为空,则移除该topic
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
//释放写锁
finally {
this.lock.writeLock().unlock();
}