生产者DefaultMQProducer启动
消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者既是客户端,也是消息的提供者。
下面来看下DefaultMQProducer相关的方法和属性。
DefaultMQProducer相关的方法
-
//创建主题 void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
-
//根据时间戳从队列中查找消息偏移量 long searchOffset(final MessageQueue mq, final long timestamp)
-
//查找消息队列中最大的偏移量 long maxOffset(final MessageQueue mq) throws MQClientException;
-
//查找消息队列中最小的偏移量 long minOffset(final MessageQueue mq)
-
//根据messageId查找消息 MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
-
//根据条件查找消息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;
-
//根据消息ID和主题查找消息 MessageExt viewMessage(String topic,String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
-
//启动 void start() throws MQClientException;
-
//关闭 void shutdown();
-
//查找该主题下所有消息 List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
-
//同步发送消息 SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
//同步超时发送消息 SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
//异步发送消息 void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
-
//异步超时发送消息 void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException;
-
//发送单向消息 void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException;
-
//选择指定队列同步发送消息 SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
//选择指定队列异步发送消息 void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
-
//选择指定队列单项发送消息 void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;
-
//批量发送消息 SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;
DefaultMQProducer相关的属性
producerGroup:生产者所属组
//AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; Will be created at broker when isAutoCreateTopicEnable
createTopicKey:默认Topic
//private volatile int defaultTopicQueueNums = 8;
defaultTopicQueueNums:默认主题在每一个Broker队列数量8
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2,总共执行3次
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M
DefaultMQProducerImpl启动流程分析
消费者Consumer的启动和生产者Producer启动很类似,可以和Producer启动进行对比。
不同之处是消费端的PullMessageService、RebalanceService才有真正作用。
这2个定时任务对于producer是无用的,但是这两个定时任务是消费端的核心。
参考:segmentfault.com/a/119000002…
代码:DefaultMQProducerImpl#start
//检查生产者组是否满足要求
this.checkConfig();
//更改当前instanceName为进程ID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//获得MQ客户端实例 MQClientInstance
this.mQClientFactory = MQClientManager.getInstance()
.getAndCreateMQClientInstance
(this.defaultMQProducer, rpcHook);
一个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表。
同一个clientId只会创建一个MQClientInstance。
这也意味着同一台服务器,启动多个生产者、消费者也只有一个MQClientInstance。
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable
= new ConcurrentHashMap<String,MQClientInstance>();
MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
代码:DefaultMQProducerImpl#start
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//启动生产者
if (startFactory) {
mQClientFactory.start();
}
DefaultMQPullConsumerImpl.start() (org.apache.rocketmq.client.impl.consumer)
DefaultMQAdminExtImpl.start() (org.apache.rocketmq.tools.admin)
DefaultMQPushConsumerImpl.start() (org.apache.rocketmq.client.impl.consumer)
DefaultMQProducerImpl.start(boolean) (org.apache.rocketmq.client.impl.producer)
//上面的类都调用的该方法 所以这方法不是只有生产者调用
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks 2分钟一次
//更新NameServer地址,该地址要求返回结果为一个ip列表,以;隔开,
//如果获取回来的地址跟现有的地址不一致则会更新缓存的NameServer地址列表。
//解析出来的地址列表用于根据NettyRemotingClient内部持有的变量:
this.startScheduledTask();
// 启动拉取消息线程
this.pullMessageService.start();
// 该方法用于启动rebalance任务。
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
DefaultMQProducerImpl启动源码
DefaultMQProducerImpl#start()
//启动producer
public void start() throws MQClientException {
//注意这里是个true 代表首先要启动mQClientFactory
this.start(true);
}
DefaultMQProducerImpl#start(boolean)
//第一次进来 startFactory = true
public void start(final boolean startFactory) throws MQClientException {
//初始默认是ServiceState.CREATE_JUST
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//检查生产者组命名规范 校验如下:
//不能为空
//最大长度不能超过255
//不能叫 DEFAULT_PRODUCER
//正则校验: ^[%|a-zA-Z0-9_-]+$
this.checkConfig();
//更改当前instanceName为进程ID
//主要是通过UtilAll.getPid()
//UtilAll.getPid() 则是调用
// ManagementFactory.getRuntimeMXBean().getName()
//name的格式是 pid@hostname
//提取 pid@hostname 中的 pid
if(!this.defaultMQProducer.getProducerGroup().equals
(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//通过MQClientManager.getInstance()获得MQ客户端实例 MQClientInstance
//MQClientManager.getInstance()返回的是MQClientManager
//MQClientManager使用了单例模式
//MQClientManager.getInstance()
// 1.首先拼接 IP + @ + PID 如172.18.95.215@5444 作为clientId
// 2.MQClientInstance instance = this.factoryTable.get(clientId);
// 3.如果为空则创建MQClientInstance
// 4.放入:this.factoryTable.putIfAbsent(clientId, instance);
//这样可以保证同一个JVM用一个MQClientInstance,比如启动生产者和消费者在1个main
// 下次在启动MQClientInstance会判断状态是RUNNING,会直接break。跳出switch。
this.mQClientFactory = MQClientManager.getInstance()
.getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
//如果存在同名的producer registerOK 返回false
boolean registerOK
= mQClientFactory
.registerProducer
(this.defaultMQProducer.getProducerGroup(), this);
//如果存在同名的producer抛出异常
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException
("The producer group has created use another name please");
}
//KEY = TBW102
//VALUE = TopicPublishInfo
this.topicPublishInfoTable
.put(this.defaultMQProducer.getCreateTopicKey(),
new TopicPublishInfo());
if (startFactory) {
//这里会调用
//org.apache.rocketmq.client.impl.factory.MQClientInstance#start
//启动很多服务
//具体看
//org.apache.rocketmq.client.impl.factory.MQClientInstance#start
//当MQClientInstance启动完毕会调用
//this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
//完成DefaultMQProducerImpl的启动
mQClientFactory.start();
}
//修改启动状态
//这样下次同一台机器的其他的producer启动时不需要在启动mQClientFactory
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException
("The producer service state not OK, maybe started once");
default:
break;
}
//发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
检查生产者组命名规范
MQClientInstance#checkConfig
//检查生产者组命名规范 校验如下:
//不能为空
//最大长度不能超过255
//不能叫 DEFAULT_PRODUCER
//正则校验: ^[%|a-zA-Z0-9_-]+$
private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQClientException("producerGroup is null", (Throwable)null);
} else if (this.defaultMQProducer.getProducerGroup().equals("DEFAULT_PRODUCER")) {
throw new MQClientException("producerGroup can not equal DEFAULT_PRODUCER, please specify another one.", (Throwable)null);
}
}
注册生产者,k:生产者名,v是生产者
MQClientInstance#registerProducer
//注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
//如果存在同名的producer registerOK 返回false
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
创建单例MQClientInstance
MQClientManager#getAndCreateMQClientInstance
MQClientInstance不是对外应用类,也就是说用户不需要自己实例化使用他。并且,MQClientInstance的实例化并不是直接new后使用,而是通过MQClientManager这个类型。
MQClientManager是个单例类,使用饿汉模式设计保证线程安全。他的作用是提供MQClientInstance实例,RocketMQ认为,MQClientInstance的实例是可以复用的实例,只要client相关特征参数相同,就会复用一个MQClientInstance实例,我们可以看看源码
原文链接:blog.csdn.net/tales522/ar…
// private ConcurrentMap factoryTable =new ConcurrentHashMap();
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig,
RPCHook rpcHook) {
//构建客户端ID
String clientId = clientConfig.buildMQClientId();
//根据客户端ID或者客户端实例
MQClientInstance instance = this.factoryTable.get(clientId);
//实例如果为空就创建新的实例,并添加到实例表中
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
//添加到实例表中
//factoryTable 是1个 ConcurrentMap
//通过容器的方式使用单例
new ConcurrentHashMap<String, MQClientInstance>();
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
启动MQClientInstance
MQClientInstance#start
DefaultMQPullConsumerImpl.start() (org.apache.rocketmq.client.impl.consumer)
DefaultMQAdminExtImpl.start() (org.apache.rocketmq.tools.admin)
DefaultMQPushConsumerImpl.start() (org.apache.rocketmq.client.impl.consumer)
DefaultMQProducerImpl.start(boolean) (org.apache.rocketmq.client.impl.producer)
//上面的类都调用的该方法 所以这方法不是只有生产者调用
//这也解释了为什么producer启动时为什么启动了那么多服务
//因为不管是消费者还是生产者都需要从nameServer拉取服务地址列表、发送心跳
//因此启动时不管先启动消费者还是先启动生产者 这些操作都是必不可少的。
//如果这样那么为什么不放到一起执行呢?
//启动的时机不一样吧
public void start() throws MQClientException {
synchronized (this) {
//默认是 CREATE_JUST
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
//启动 NettyRemotingClient
this.mQClientAPIImpl.start();
// 有个判断null == this.clientConfig.getNamesrvAddr()
// 才会启动这个更新nameServer的地址的定时任务
// 2分钟更新一次nameServer的地址,该地址要求返回结果为一个ip列表,以;隔开
// 如果获取回来的地址跟现有的地址不一致则会更新缓存的NameServer地址列表。
// 30秒更新一次topic的路由信息 如果不一致 会做全量更新
// 30秒对Broker发送一次心跳检测,并将下线的broker删除
// 5秒持久化一次consumer的offset
// 1分钟调整一次线程池,这个定时任务其实什么都没有执行。
this.startScheduledTask();
// Start pull service 启动拉取消息线程
this.pullMessageService.start();
// 启动负载均衡服务 这里是一个Thread 所以看run方法
this.rebalanceService.start();
//注意这里是false
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException
("The Factory objec has created before, and failed.");
default:
break;
}
}
}
MQClientInstance#startScheduledTask
每30秒更新一次topic的路由信息
如果不一致做全量更新。都是map做的put操作。
- MQClientInstance的updateTopicRouteInfoFromNameServer方法首先从consumerTable及producerTable获取topicList,然后遍历topicList执行updateTopicRouteInfoFromNameServer,最后执行的是updateTopicRouteInfoFromNameServer(topic, false, null)
- 然后会执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)获取topicRouteData然后与topicRouteTable中的TopicRouteData进行对比,先通过topicRouteDataIsChange判断是否有变化,没有的话再通过isNeedUpdateTopicRouteInfo进一步判断
- 若有变化则更新brokerAddrTable。
- 遍历producerTable执行impl.updateTopicPublishInfo(topic, publishInfo);
- 遍历consumerTable执行impl.updateTopicSubscribeInfo(topic, subscribeInfo)
- 最后将cloneTopicRouteData更新到topicRouteTable
每30秒对Broker发送一次心跳检测
每30秒更新本地路由删除下线broker
5秒持久化一次consumer的offset
private void startScheduledTask() {
// http://jmenv.tbsite.net:8080/rocketmq/nsaddr
// producer在初始化时通过向固定地址发送 httpGet请求,从而获得name server地址
// 一般都是提前配置好 不存在动态增减的情况.
// 因为是有个判断null == this.clientConfig.getNamesrvAddr() 才会启动这个定时任务
// 2分钟更新一次nameServer的地址,该地址要求返回结果为一个ip列表,以;隔开
// 如果获取回来的地址跟现有的地址不一致则会更新缓存的NameServer地址列表。
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 30秒从nameserver拉取一次topic的路由信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// this.clientConfig.getHeartbeatBrokerInterval()
// private int pollNameServerInterval = 1000 * 30;
// 30秒对Broker发送一次心跳检测,并将下线的broker删除,清理下线的broker
// 更新本地路由信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// this.clientConfig.getPersistConsumerOffsetInterval()
// private int persistConsumerOffsetInterval = 1000 * 5;
// 5秒持久化一次consumer的offset是给消费者用的
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 1分钟调整一次线程池,这个定时任务其实什么都没有执行。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
启动拉取消息线程
消息拉取与长轮询讲消费者的时候再专门分析
启动负载均衡服务
负载均衡服务讲消费者的时候再专门分析
生产者启动主要是创建单例MQClientInstance,然后启动MQClientInstance,启动MQClientInstance主要是启动下面几个重要的定时任务。
1.每30秒更新一次topic的路由信息
2.每30秒对Broker发送一次心跳检测
3.每30秒更新本地路由删除下线broker
4.5秒持久化一次consumer的offset
5.启动拉取消息定时任务
6.启动重新负载均衡定时任务
其中消息拉取与长轮询和负载均衡定时任务是消费者的核心定时任务,讲消费者的时候再讲。