消费者启动
消费者启动示例代码
package com.itheima.mq.rocketmq.base.producer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ConsumerAndProducer {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//1.创建消息生产者producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 1; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg
= new Message("tyrant", "tag", ("Hello World -" + i).getBytes());
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
String msgId = result.getMsgId();
MessageQueue messageQueue = result.getMessageQueue();
System.out.println("结果:" + result );
System.out.println("状态:" + status);
System.out.println("messageQueue:" + messageQueue);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者producer
// producer.shutdown();
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setConsumeThreadMin(2);
consumer.setConsumeThreadMax(60);
consumer.setNamesrvAddr("localhost:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("tyrant", "tag");
//设定消费模式:负载均衡|广播模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs.size());
for (MessageExt msg : msgs) {
msg.setReconsumeTimes(3);
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
//可以设置消费失败的位置
// context.setAckIndex();
return null;
}
});
//5.启动消费者consumer
consumer.start();
}
}
消费者启动流程
消费者构造&初始化
1.指定消费者组,负载均衡策略,消费模式
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
public DefaultMQPushConsumer(String consumerGroup) {
//可以看到默认的负载均衡策略是平均
this(consumerGroup, (RPCHook)null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.log = ClientLogger.getLog();
//默认集群策略
this.messageModel = MessageModel.CLUSTERING;
this.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
this.consumeTimestamp =
UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1800000L);
this.subscription = new HashMap();
this.consumeThreadMin = 20;
this.consumeThreadMax = 64;
this.adjustThreadPoolNumsThreshold = 100000L;
this.consumeConcurrentlyMaxSpan = 2000;
this.pullThresholdForQueue = 1000;
this.pullThresholdSizeForQueue = 100;
this.pullThresholdForTopic = -1;
this.pullThresholdSizeForTopic = -1;
this.pullInterval = 0L;
this.consumeMessageBatchMaxSize = 1;
this.pullBatchSize = 32;
this.postSubscriptionWhenPull = false;
this.unitMode = false;
this.maxReconsumeTimes = -1;
this.suspendCurrentQueueTimeMillis = 1000L;
this.consumeTimeout = 15L;
this.traceDispatcher = null;
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
this.defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
2.消费者订阅主题,设置消息过滤tag
//3.订阅主题Topic和Tag
consumer.subscribe("tyrant", "tag");
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
//解析subExpression封装为subscriptionData
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData
(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
//将解析完成的subscriptionData放入subscriptionInner
//注意subscriptionInner 后面还会用到
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception var4) {
throw new MQClientException("subscription exception", var4);
}
}
消费者启动流程
消费端启动和producer启动很类似,可以和producer启动进行对比。
不同之处是消费端的PullMessageService、RebalanceService才有真正作用,而producer该两个服务线程是无用的,这两个服务线程也是消费端的核心。
public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
//注意这里默认构造的是defaultMQPushConsumerImpl
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
//this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
//向defaultMQPushConsumerImpl的rebalanceImpl的subscriptionInner放入数据
//key是topic value是subscriptionData
//"tyrant" -> "SubscriptionData [classFilterMode=false, topic=tyrant, subString=tag, //tagsSet=[tag], codeSet=[114586], subVersion=1617670290656, expressionType=TAG]"
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData
= FilterAPI.buildSubscriptionData
(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
/***
可以使用该方法指定多个topic 和 对应的tag。只不过这里设置的subscribtion,在copySubscription中也会执行向defaultMQPushConsumerImpl的rebalanceImpl的subscriptionInner放入数据的操作
**/
public void setSubscription(Map<String, String> subscription) {
Map<String, String> subscriptionWithNamespace = new HashMap<String, String>();
for (String topic : subscription.keySet()) {
subscriptionWithNamespace
.put(withNamespace(topic), subscription.get(topic));
}
this.subscription = subscriptionWithNamespace;
}
DefaultMQPushConsumerImpl#start
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
//注意这里的start方法和
//注意这里是个true 代表首先要启动mQClientFactory
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//检查消费组命名规范 校验如下:
//不能为空
//最大长度不能超过255
//不能叫 DEFAULT_CONSUMER
//正则校验: ^[%|a-zA-Z0-9_-]+$
this.checkConfig();
//1.构建topic和tag(subscription)之间的映射关系
//2.设置消息监听器
//#进入copySubscription
this.copySubscription();
//如果是集群模式
//更改当前instanceName为进程ID
//主要是通过UtilAll.getPid()
//UtilAll.getPid() 则是调用
//ManagementFactory.getRuntimeMXBean().getName()
//name的格式是 pid@hostname
//提取 pid@hostname 中的 pid
if (this.defaultMQPushConsumer.getMessageModel()
== MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//MQClientManager使用了单例模式通过MQClientManager.getInstance()返回单例
//MQClientManager.getInstance()返回的是MQClientManager
//getAndCreateMQClientInstance()返回的是MQ客户端实例 MQClientInstance
// 1.首先拼接 IP + @ + PID 如172.18.95.215@5444 作为clientId
// 2.MQClientInstance instance = this.factoryTable.get(clientId);
// 3.如果为空则创建MQClientInstance
// 4.放入:this.factoryTable.putIfAbsent(clientId, instance);
// 这样可以保证同一个JVM用的是一个MQClientInstance,
// 下次在启动MQClientInstance会判断状态是RUNNING,会直接break。跳出switch。
this.mQClientFactory
= MQClientManager
.getInstance()
.getAndCreateMQClientInstance
(this.defaultMQPushConsumer, this.rpcHook);
//注意这里都是this.rebalanceImpl在设置属性
//设置消费组
this.rebalanceImpl
.setConsumerGroup
(this.defaultMQPushConsumer.getConsumerGroup());
//设置消费模式
this.rebalanceImpl
.setMessageModel
(this.defaultMQPushConsumer.getMessageModel());
//设置分配消息的策略 默认是AllocateMessageQueueAveragely
this.rebalanceImpl
.setAllocateMessageQueueStrategy
(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
//设置MQ客户端实例
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper
.registerFilterMessageHook(filterMessageHookList);
//如果不是 第一次启动 首先获取offsetStore
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
//消息消费广播模式,将消费进度保存在本地
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore
(this.mQClientFactory,
this.defaultMQPushConsumer
.getConsumerGroup());
break;
//消息消费集群模式,将消费进度保存在远端Broker
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore
(this.mQClientFactory, this.defaultMQPushConsumer
.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加载offsetStore
this.offsetStore.load();
//如果是顺序消费
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
//设置顺序消费标志
this.consumeOrderly = true;
//强转消费消息服务为MessageListenerOrderly
this.consumeMessageService
=new ConsumeMessageOrderlyService
(this, (MessageListenerOrderly)
this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof
MessageListenerConcurrently){
//设置并发消费标志
this.consumeOrderly = false;
//强转消费消息服务为MessageListenerConcurrently
this.consumeMessageService
=new ConsumeMessageConcurrentlyService
(this, (MessageListenerConcurrently)
this.getMessageListenerInner());
}
//启动消费消息服务
//consumeMessageService.start()
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer
(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//这里会调用
//org.apache.rocketmq.client.impl.factory.MQClientInstance#start
//启动很多服务
//当MQClientInstance启动完毕会调用
//this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
//完成DefaultMQProducerImpl的启动
mQClientFactory.start();
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
3.订阅重试主题名称:%RETRY%+消费组名
只是订阅,但是重试主题还没有创建。只有存在需要重试的消息的时候才会创建重试的主题。
private void copySubscription() throws MQClientException {
try {
// Map<String /* topic */, String /* sub expression */> subscription
// subscription: key是topic value是tag
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
//这里的sub不是空 但是size = 0
//可以直接跳过
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
//构建subscriptionData
SubscriptionData subscriptionData
= FilterAPI.buildSubscriptionData
(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
//放入Map key是topic value是subscriptionData
this.rebalanceImpl.getSubscriptionInner()
.put(topic, subscriptionData);
}
}
//如果是空 设置监听器
if (null == this.messageListenerInner) {
this.messageListenerInner = this
.defaultMQPushConsumer
.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
//如果是广播模式 广播模式不会重试
case BROADCASTING:
break;
//如果是集群模式
case CLUSTERING:
//构建重试组topic名称 %RETRY% + 消费组名称 注意是消费组名
//如果我的消费组是 group1
//那么重试topic名称 %RETRY%group1
final String retryTopic = MixAll.getRetryTopic
(this.defaultMQPushConsumer.getConsumerGroup());
//构建重试主体的subscriptionData
//SubscriptionData.SUB_ALL 值是 *即订阅重试主题的所有消息
//解析表达式封装为 subscriptionData
SubscriptionData subscriptionData
= FilterAPI.buildSubscriptionData
(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
//向defaultMQPushConsumerImpl的rebalanceImpl
//的subscriptionInner放入数据
//在订阅原始主题的时候我们也往这里放入过数据
this.rebalanceImpl.getSubscriptionInner()
.put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
4.创建MQClientInstance,设置负载均衡服务的属性
//MQClientManager使用了单例模式通过MQClientManager.getInstance()返回单例
//MQClientManager.getInstance()返回的是MQClientManager
//MQClientManager也是单例1个jvm只有1个
//getAndCreateMQClientInstance()返回的是MQ客户端实例 MQClientInstance
// 1.首先拼接 IP + @ + PID 如172.18.95.215@5444 作为clientId
// 2.MQClientInstance instance = this.factoryTable.get(clientId);
// 3.如果为空则创建MQClientInstance
// 4.放入:this.factoryTable.putIfAbsent(clientId, instance);
// 这样可以保证同一个JVM用的是一个MQClientInstance
// 也就是一个JVM进程分别启动生产者和消费者,使用的是一个MQClientInstance
// 下次在启动MQClientInstance会判断状态是RUNNING,会直接break。跳出switch。
this.mQClientFactory
= MQClientManager
.getInstance()
.getAndCreateMQClientInstance
(this.defaultMQPushConsumer, this.rpcHook);
//注意这里都是this.rebalanceImpl在设置属性
//设置消费组
this.rebalanceImpl
.setConsumerGroup
(this.defaultMQPushConsumer.getConsumerGroup());
//设置消费模式
this.rebalanceImpl
.setMessageModel
(this.defaultMQPushConsumer.getMessageModel());
//设置分配消息的策略 默认是AllocateMessageQueueAveragely
this.rebalanceImpl
.setAllocateMessageQueueStrategy
(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
//设置MQ客户端实例
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
5.获取offsetStore,加载消费进度
广播:本地offsetStore。
比如:C:\Users\彭方亮.rocketmq_offsets
集群:远程offsetStore。
比如:D:\Program Files\rocketmq-all-4.3.0-bin-release\bin\store\config\consumerOffset.json
{
"offsetTable":{
"%RETRY%group1@group1":{0:238
},
//意思是topic是tyrant
//消费者组是group1
//消费的进度是 queueId为0的消费到了9 1消费到了7
//这里是下标 因为是定长20字节
"tyrant@group1":{0:9,1:7,2:6,3:8,4:8,5:6,6:6,7:7
}
}
}
6.根据监听器创建并发消费服务或者顺序消费服务
消息服务: ConsumeMessageOrderlyService&ConsumeMessageConcurrentlyService
默认其实就是并发消费
//如果不是 第一次启动 首先获取offsetStore实例
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.faultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
//消息消费广播模式,将消费进度保存在本地
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore
(this.mQClientFactory,
this.defaultMQPushConsumer
.getConsumerGroup());
break;
//消息消费集群模式,将消费进度保存在远端Broker
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore
(this.mQClientFactory, this.defaultMQPushConsumer
.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加载offsetStore
//本地是
//System.getProperty("user.home") + File.separator +
// ".rocketmq_offsets");
//远程是
// return rootDir + File.separator + "config" + File.separator +
// "consumerOffset.json";
//远程不在这里加载
this.offsetStore.load();
//如果是顺序消费
if (this.getMessageListenerInner()
instanceof MessageListenerOrderly) {
//设置顺序消费标志
this.consumeOrderly = true;
//强转消费消息服务为MessageListenerOrderly
this.consumeMessageService
=new ConsumeMessageOrderlyService
(this, (MessageListenerOrderly)
this.getMessageListenerInner());
} else if (this.getMessageListenerInner()
instanceof MessageListenerConcurrently){
//设置并发消费标志
this.consumeOrderly = false;
//强转消费消息服务为MessageListenerConcurrently
this.consumeMessageService
=new ConsumeMessageConcurrentlyService
(this, (MessageListenerConcurrently)
this.getMessageListenerInner());
}
consumeMessageService.start
因为消费方式分为并发消费和顺序消费,所以这里根据消费方式的不同选择的实现类也不同。
下面的消费方式是并发消费。
下面的消费方式是并发消费。
下面的消费方式是并发消费。
ConsumeMessageConcurrentlyService.start
15分钟执行一次:清理超时消息并发回broker★
// private long consumeTimeout = 15 分钟
// 15分钟执行一次清理过期消息任务
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
},
this.defaultMQPushConsumer.getConsumeTimeout(),
this.defaultMQPushConsumer.getConsumeTimeout(),
TimeUnit.MINUTES);
}
ConsumeMessageConcurrentlyService.cleanExpireMsg
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
this.defaultMQPushConsumerImpl
.getRebalanceImpl()
.getProcessQueueTable()
.entrySet()
.iterator();
//遍历ProcessQueue
while (it.hasNext()) {
Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
//清理ProcessQueue中的过期数据
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
}
}
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
//如果是顺序消费直接返回 不需要清理卡住进度的消息
//这里的值是在start方法中判断ConsumeMessageOrderlyService后设置为true
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}
//每次最多取16个消息
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
//如果msgTreeMap中还有元素
//且当前时间减去-第一个消息开始消费的时间大于15分钟
if (!msgTreeMap.isEmpty() &&
System.currentTimeMillis() - Long.parseLong
(MessageAccessor.getConsumeStartTimeStamp
(msgTreeMap.firstEntry().getValue()))>
pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
} else {
break;
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
try {
//把消息重新发送给broker,设置延迟级别=3
pushConsumer.sendMessageBack(msg,3);
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() &&
msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {
//移除此消息
removeMessage(Collections.singletonList(msg));
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
}
思考一下:什么时候会出现消息消费超时?
保证消费成功
PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。消费的时候,我们需要注入一个消费回调,具体sample代码如下:
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
doMyJob();//执行真正消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息 (默认是1条) 是消费完成的。
如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重新发回Broker。
发回的topic不是原topic而是这个消费组的RETRY topic,默认的延迟等级是3,即10秒,业务可设置后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。
1.如果业务的回调抛出异常那么认为消费失败。当成ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
2.当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。
启动的时候从哪里消费
当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。
如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:
//默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_LAST_OFFSET
//从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_FIRST_OFFSET
//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
CONSUME_FROM_TIMESTAMP
所以,社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。
对于老消费组想跳过历史消息可以采用以下3种方法:
1.代码按照日期判断,太老的消息直接return CONSUME_SUCCESS过滤。
2.代码判断消息的offset和MAX_OFFSET相差很远,认为是积压了很多,直接return CONSUME_SUCCESS过滤。
3.消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用命令resetOffsetByTime,或调用内部的运维接口,祥见ResetOffsetByTimeCommand.java
消息ACK机制
RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。
每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。
但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:
这钟方式和传统的一条message单独ack的方式有本质的区别。在性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。
在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才会一下子更新到2200。
在这种设计下,就有消费大量重复的风险。
如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。
这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。
对于这个场景,3.2.6之前的RocketMQ无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。
实际上,从源码的角度上看,RocketMQ可能是考虑过这个问题的,截止到3.2.6的版本的源码中,可以看到为了缓解这个问题的影响面,DefaultMQPushConsumer中有个配置consumeConcurrentlyMaxSpan
private int consumeConcurrentlyMaxSpan = 2000;
这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个阈值就不再拉取消息。
但作用实际很有限,像刚刚这个例子,2101的消费是死循环,其他消费非常正常的话,是无能为力的。一旦退出,在不人工干预的情况下,2101后所有消息全部重复。
Ack卡进度解决方案
对于卡消费进度的问题,最显而易见的解法是设定一个超时时间,达到超时时间的那个消费当作消费失败处理。
后来RocketMQ显然也发现了这个问题,而RocketMQ在3.5.8之后也就是采用这样的方案去解决这个问题。
1.在pushConsumer中 有一个consumeTimeout字段(默认15分钟),用于设置最大的消费超时时间。消费前会记录一个消费的开始时间,后面用于比对。
2.消费者启动的时候,会定期扫描所有消费的消息,达到这个timeout的那些消息,就会触发sendBack并ack的操作。这里扫描的间隔也是consumeTimeout(单位分钟)的间隔。
通过源码看这个方案,其实可以看出有几个不太完善的问题:
cleanExpireMsg();
- 消费timeout的时间非常不精确。由于扫描的间隔是15分钟,所以实际上触发的时候,消息是有可能卡住了接近30分钟(15*2)才被清理。
- 由于定时器一启动就开始调度了,中途这个consumeTimeout再更新也不会生效。
pushConsumer.sendMessageBack(msg, 3);
参考链接:https://blog.csdn.net/ph3636/article/details/90484951
public void sendMessageBack(MessageExt msg, int delayLevel)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
try {
String brokerAddr
= (null != brokerName) ?
this.mQClientFactory
.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
//发送消息到原Topic
this.mQClientFactory
.getMQClientAPIImpl()
.consumerSendMessageBack(brokerAddr,msg,
this.defaultMQPushConsumer.getConsumerGroup(),
delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
//当发送重试消息出现异常时 有可能是broker异常
//这个时候采用正常的发送流程,重新创建一个新消息,
//把topic改成重试组的topic,保存以前的消息id。
//默认最大重试次数为16,所以一个消息最多可以消费17次,默认设置延迟级别范围为3到18。
//为什么是16?因为18个等级,从3开始重试,18-3+1 =15
//发送到重试消费Topic 是%RETRY% + 消费组名 注意是消费组名
//我们思考一下为什么是消费者组名?
//A消费者组消费成功 B消费者组消费失败
//如果发回原topic就有问题了,A又会消费一次
Message newMsg
= new Message(MixAll.getRetryTopic(
this.defaultMQPushConsumer.getConsumerGroup()),
msg.getBody());
//当然第一次重试是没有属性源消息id的,
//这个时候设置当前消息的id为源消息id,
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId
(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
//将原真实Topic的属性赋值给重试topic
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty
(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
//设置重试次数加1,重试次数默认是0
MessageAccessor.setReconsumeTime
(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes
(newMsg, String.valueOf(getMaxReconsumeTimes()));
//重试次数默认是0 即 3 + 0 为 DelayTimeLevel 即 10秒后再次重试
//3-18 即 18-3+1 = 16 即重试16次
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
//发送消息到重试Topic
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
private int getMaxReconsumeTimes() {
// default reconsume times: 16
if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
return 16;
} else {
return this.defaultMQPushConsumer.getMaxReconsumeTimes();
}
}
MQClientAPIImpl#consumerSendMessageBack
//发送重试消息到原Topic,组装消息头类ConsumerSendMsgBackRequestHeader
//请求码为CONSUMER_SEND_MSG_BACK = 36
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
ConsumerSendMsgBackRequestHeader requestHeader =
new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
#### ConsumeMessageOrderlyService#start
//首次延迟1秒启动 此后每20秒执行一次
public void start() {
if(MessageModel.CLUSTERING.equals(
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
ConsumeMessageOrderlyService#lockMQPeriodically
顺序消费要周期给自己分配到的队列加锁
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}
RebalanceImpl#lockAll
/***
通过ReblanceImp的lockAll方法,每隔一段时间定时锁住当前消费端正在消费的队列。
设置本地队列ProcessQueue的locked属性为true。
保证broker中的每个消息队列只对应一个消费端,防止重复消费
***/
public void lockAll() {
/***
从ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 中获取keySet
keySet中的key是MessageQueue
通过MessageQueue获取BrokerName 作为key,MessageQueue作为value放入
HashMap<String brokerName, Set<MessageQueue>>
***/
HashMap<String, Set<MessageQueue>> brokerMqs
=this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it
= brokerMqs.entrySet().iterator();
//遍历brokerMqs 即 HashMap<String/* brokerName */, Set<MessageQueue>>
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty())
continue;
//获取brokerName对应的master节点
FindBrokerResult findBrokerResult = this
.mQClientFactory
.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
//master节点不为空
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
try {
//批量对MessageQueue加锁
Set<MessageQueue> lockOKMQSet =this
.mQClientFactory
.getMQClientAPIImpl()
.lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
//遍历加锁成功的MessageQueue
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info
("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}
//给processQueue设置加锁状态为成功
processQueue.setLocked(true);
//设置上一次加锁时间
processQueue.setLastLockTimestamp
(System.currentTimeMillis());
}
}
//获取加锁失败的MessageQueue
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) {
ProcessQueue processQueue
= this.processQueueTable.get(mq);
if (processQueue != null) {
//设置加锁状态为失败
processQueue.setLocked(false);
}
}
}
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
}
}
}
}
7.启动MQClientInstance
MQClientInstance#start
DefaultMQPullConsumerImpl.start() (org.apache.rocketmq.client.impl.consumer)
DefaultMQAdminExtImpl.start() (org.apache.rocketmq.tools.admin)
DefaultMQPushConsumerImpl.start() (org.apache.rocketmq.client.impl.consumer)
DefaultMQProducerImpl.start(boolean) (org.apache.rocketmq.client.impl.producer)
//消费端启动和producer启动很类似,可以和producer启动进行对比。
//不同之处是消费端的PullMessageService、RebalanceService才有真正作用,而producer该两个服务线程是无用的,这两个服务线程也是消费端的核心。
//上面的类都调用的该方法 所以这方法不是只有生产者调用
//这也解释了为什么producer启动时为什么启动了那么多服务
//因为不管是消费者还是生产者都需要从nameServer拉取服务地址列表、发送心跳
//因此启动时不管先启动消费者还是先启动生产者 这些操作都是必不可少的。
//如果这样那么为什么不放到一起执行呢?
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
//和远程服务打交道的实例
this.mQClientAPIImpl.start();
// 2分钟更新一次nameServer的地址,该地址要求返回结果为一个ip列表,以;隔开, // 如果获取回来的地址跟现有的地址不一致则会更新缓存的NameServer地址列表。
// 30秒更新一次topic的路由信息
// 30秒对所有的Broker发送一次心跳检测,并将下线的broker删除
// 5秒持久化一次consumer的offset
// 1分钟调整一次线程池,这个定时任务其实什么都没有执行。
this.startScheduledTask();
// 启动拉取消息的线程
this.pullMessageService.start();
// 启动负载均衡服务 这里是一个Thread 所以看run方法
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
<!---->
同一个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
/* key 为 clientId 即ip + @ + pid */
ConcurrentMap<String, MQClientInstance> factoryTable =
new ConcurrentHashMap<String,MQClientInstance>();
同一个clientId只会创建一个MQClientInstance。MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
this.startScheduledTask();
每隔2分钟更新一次nameServer的地址
该地址要求返回结果为一个ip列表,以;隔开。
如果获取回来的地址跟现有的地址不一致则会更新缓存的NameServer地址列表。
每隔30秒更新一次topic的路由信息
每隔30秒向所有的Broker发送1次心跳
每隔30秒将下线的broker删除
每隔5秒持久化一次消费进度
消费者:
@Override
public void persistConsumerOffset() {
try {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
<!---->
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (!mqs.isEmpty()) {
//遍历mq 循环更新消息进度
for (Map.Entry<MessageQueue, AtomicLong> entry :
this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
} else {
unusedMQ.add(mq);
}
}
}
}
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
log.info("remove unused mq, {}, {}", mq, this.groupName);
}
}
}
broker:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
<!---->
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
<!---->
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
// 默认消费的进度的key是topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
}
<!---->
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
//现根据key:topic@group 获取到队列id对应的消费进度
//{0:1,1:1,2:1,3:1,4:1,5:2,6:1,7:1}
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
//如果是空,说明原来没有创建过就新建1个
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
//如果不为空那么就把queueid作为key offset作为进度持久化
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("update consumer offset less than store.");
}
}
}
<!---->
//tyrant@group1代表的是主题tyrant的消费者组group1的消费进度
//0:1代表的是queueid为0的消费到了在ConsumeQueue中下标为1的索引对应的消息
{
"offsetTable":{
"tyrant@group1":{0:1,1:1,2:1,3:1,4:1,5:2,6:1,7:1
},
"%RETRY%group1@group1":{0:1
}
}
}
每隔1分钟调整一次线程池
private void startScheduledTask() {
//这里的namesrvAddr不是null
//原因是我们设置了
//consumer.setNamesrvAddr("localhost:9876");
//这个定时任务会跳过
//如果没有指定namesrvAddr才会拉取
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
//更新路由元信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
//删除日志
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
//1.清理下线的broker 2.发送消费者和生产者的心跳信息到broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
this.pullMessageService.start();
启动拉取消息服务
因为PullMessageService是一个线程所以需要看run方法
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
默认使用的是DefaultMQPushConsumerImpl,也就是推模式。
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
this.rebalanceService.start();
启动负载均衡消费服务
消费端消息队列负载的核心功能方法是org.apache.rocketmq.client.impl.consumer.RebalanceImpl.updateProcessQueueTableInRebalance(String, Set, boolean),只解释该方法,其余方法看流程图看代码就很容易明白。
传入参数Set是经过负载后分配给当前消费端的mq集合,boolean表示是顺序消费true,并发消费false。
参考链接:blog.csdn.net/yulewo123/a…
服务负载均衡下篇文章专门讲。