消费消息
PullMessageService负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存储ProcessQueue消息队列处理队列中,然后调用ConsumeMessageService#submitConsumeRequest方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。
ConsumeMessageService支持顺序消息和并发消息。
消息消费概述
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题,消费组之间有负载均衡模式和广播模式两种消费模式。
集群模式,主题下的同一条消息只允许被同一个group中一个消费者消费。
广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。
消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。
所谓的拉模式,是消费端主动拉消息,而推模式是消息达到消息服务器后,推送给消息消费者。
RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
集群模式下,多个消费者如何对消息队列进行负载呢?
消息队列负载机制遵循一个通用思想:
一个消息队列同一个时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。
不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为1,牺牲高可用性。
集群模式和广播模式区别
对于失败消息,广播消费会丢弃。广播模式不支持消费失败重投。 集群消费会发回Broker重新消费;
如果是集群消费模式,会订阅重试主题消息 获取重试topic,规则为 RETRYGROUPTOPIC_PREFIX + consumerGroup,即:”%RETRY%”+消费组名。
为重试topic设置订阅关系,订阅所有的消息。 消费者启动的时候会自动订阅该重试主题,并参与该topic的消息队列负载过程。
顺序消费和并发消费区别
consumeMessageService 分为并发消费和顺序消费
顺序消费指同一时刻一个queue只有一个线程在消费,只让一个线程消费由加锁来实现,而顺序则由TreeMap来实现。
保证顺序,既要保证同一个queue的消息不会被其他消费者消费,还要保证同一个queue的消息不能被并发消费。
顺序消费跟并发消费最大的区别在于顺序消费对要处理的队列加锁确保同一队列同一时间只允许一个消费线程处理。
另一个区别在于ConsumeRequest
ConsumeMessageOrderlyService.ConsumeRequest
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
}
ConsumeMessageConcurrentlyService.ConsumeRequest
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
可以发现ConsumeMessageConcurrentlyService.ConsumeRequest中有msgs参数。
ConsumeMessageOrderlyService.ConsumeRequest没有msgs参数。
msgs就是我们的消息。为什么ConsumeMessageOrderlyService不需要消息呢?
因为顺序消费时一个queue拉取到32条消息则创建一个ConsumeRequest对象,提交到线程池。在ConsumeRequest.run方法中,从processQueue不断take offset最小的消息,按照消息的offset顺序一条一条地消费,直到TreeMap空。
并发消费时一个queue拉取到32条消息则创建32个ConsumeRequest对象,将每个ConsumeRequest提交到线程池中。
在ConsumeRequest.run方法中执行listener.consumeMessage(msgs, context);
顺序消费是一批消息对应一个ConsumeRequest对象,由一个线程处理,所以可以保证顺序性。
并发消费是一批消息对应多个ConsumeRequest对象,由多个线程处理,所以可以保证并发度。
消费失败
顺序消费,处理消息失败,对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 版会自动不断地进行消息重试(每次间隔时间为1秒,最大重试次数Interger.MAX_VALUE),这时,应用会出现消息消费被阻塞的情况。
因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
并发消费消费消息失败,会发送回broker重试,如果发送broker失败,则消费者继续本地重试消费。
并发消费消费消息失败就会发到broker,重试消息会被保存到SCHEDULE_TOPIC_XXXX主题对应的延时mq内,延迟队列就是各自的延时级别。当延迟时间到了以后会将延时队列的消息重新发回到%RETRY%consumegroup。
再次说明一下为什么发回的是%RETRY%consumegroup而不是%RETRY%topic。
因为可能有A B2个消费者组订阅了topic的消息。如果发回%RETRY%topic,A和B都会重试!但是如果谁消费失败了就发回到对应的%RETRY%consumegroup。
消费消息入口
DefaultMQPushConsumerImpl#pullMessage
在拉取消息的时候会给拉取消息的线程注册1个回调pullCallback。
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
在PullCallback的onSuccess方法中,有如下代码:
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
//拉取的消息列表
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
消费消息时,consumeMessageService是顺序消费还是并发消费。是根据MessageListenerInner判断
if (this.getMessageListenerInner()
instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService
(this, (MessageListenerOrderly)
this.getMessageListenerInner());
} else if (this.getMessageListenerInner()
instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService
(this, (MessageListenerConcurrently)
this.getMessageListenerInner());
}
ConsumeMessageConcurrentlyService#submitConsumeRequest
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
//private int consumeMessageBatchMaxSize = 1;
//消费者客户端的默认单次消费消息的个数,默认是1,可以设置
//重点:可以将该参数适量调大 提高消费速度
//重点:可以将该参数适量调大 提高消费速度
final int consumeBatchSize = this.defaultMQPushConsumer
.getConsumeMessageBatchMaxSize();
//如果本次拉取消息数量小于等于consumeMessageBatchMaxSize = 1
//即只拉取到一个消息
if (msgs.size() <= consumeBatchSize) {
//即待消费的消息集合只有1条消息、处理队列、消息队列包装为task对象
//即consumeRequest
ConsumeRequest consumeRequest
= new ConsumeRequest(msgs, processQueue, messageQueue);
try {
//把task对象丢入到消费线程池处理,这是多个线程并发执行,因此叫并发消费。
//消费端的线程池是ConsumeMessageConcurrentlyService.consumeExecutor
//默认是20个消费线程,如果业务代码消费消息速度慢
//可以在消费客户端进行设置较大的消费线程池 提高消费速度。
//可以在消费客户端进行设置较大的消费线程池 提高消费速度。
//可以在消费客户端进行设置较大的消费线程池 提高消费速度。
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//消息默认会拉取32条 private int pullBatchSize = 32;
//可以将该参数适量调大 提高每次拉取消息的条数最终达到 提高消费速度
//可以将该参数适量调大 提高每次拉取消息的条数最终达到 提高消费速度
//注意外面是一层大循环
for (int total = 0; total < msgs.size(); ) {
//private int consumeMessageBatchMaxSize = 1;
//按照消费的批次个数设置1个消费线程要消费的消息集合,默认是1个消息
//消费者客户端的默认单次消费消息的个数,默认是1,可以设置
//重点:可以将该参数适量调大 提高消费速度
//重点:可以将该参数适量调大 提高消费速度
//构造1个List 用于存放本次批量消费的消息
List<MessageExt> msgThis =
new ArrayList<MessageExt>(consumeBatchSize);
//循环放入本次要消费的消息
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
//待消费的消息集合(默认一条消息)、处理队列、消息队列包装为task对象
//ConsumeMessageConcurrentlyService.ConsumeRequest
//将本次待批量消费的消息提交到线程池
ConsumeRequest consumeRequest
= new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
//把task对象丢入到消费线程池处理,这是多个线程并发执行,因此叫并发消费。
//消费端的线程池是
//ConsumeMessageConcurrentlyService.consumeExecutor,
//默认是20个消费线程,如果业务代码消费消息速度慢,
//可以在消费客户端进行设置较大的消费线程池。
//原文链接:
//https://blog.csdn.net/yulewo123/article/details/103112786
//因为是提交到了线程池
//所以要看ConsumeMessageConcurrentlyService.ConsumeRequest#run
//this.consumeRequestQueue 是 LinkedBlockingQueue
/***
private int consumeThreadMin = 20;
private int consumeThreadMax = 20;
可以通过调整线程池大小优化消费速度
consumer.setConsumeThreadMax(30);
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
***/
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
//当线程池已经关闭或达到饱和(最大线程和队列都已满)状态时
//新提交的任务将会被拒绝策略拒绝。
//拒绝策略默认是抛出异常
//此处简单粗暴!
//再放进去重新消费!哈哈哈
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
ConsumeMessageConcurrentlyService.ConsumeRequest#run
@Override
public void run() {
//pq被抛弃,则不执行实际业务逻辑消费,
//被抛弃的原因比如消费端发生变化,rebalance线程重新负载了
if (this.processQueue.isDropped()) {
log.info
("the message queue not be able to consume, because it's dropped");
return;
}
//获取消息监听器
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService
.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace
(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService
.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace
(defaultMQPushConsumer.getNamespace());
consumeMessageContext
.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
//执行消息处理的钩子函数
ConsumeMessageConcurrentlyService.this
.defaultMQPushConsumerImpl
.executeHookBefore
(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor
.setConsumeStartTimeStamp
(msg, String.valueOf(System.currentTimeMillis()));
}
}
//业务代码执行消息消费
//调用应用程序消息监听器的consumeMessage方法,进入到具体的消息消费业务处理逻辑
status = listener
.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
//如果消费者返回的null
if (null == status) {
//有异常
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
//没有异常
returnType = ConsumeReturnType.RETURNNULL;
}
//消费时间大于1分钟
} else if (consumeRT >=
defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
//稍后消费
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
//消费成功
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService
.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps()
.put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
//如果返回的状态是null 稍后在消费
if (null == status) {
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (ConsumeMessageConcurrentlyService
.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
//执行消息处理后的钩子函数
ConsumeMessageConcurrentlyService
.this
.defaultMQPushConsumerImpl
.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this
.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService
.this.consumerGroup, messageQueue.getTopic(),
consumeRT);
if (!processQueue.isDropped()) {
//点进去看
//处理消费结果
ConsumeMessageConcurrentlyService
.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result");
}
}
ConsumeMessageConcurrentlyService#processConsumeResult
处理消费结果
/*
* 处理消费结果
*/
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
//默认是Integer.MAX_VALUE
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager()
.incConsumeOKTPS(consumerGroup,
consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager()
.incConsumeFailedTPS
(consumerGroup,
consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS
(consumerGroup,
consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it");
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed
= new ArrayList<MessageExt>
(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//将失败的消息发回
//消息回发到broker,主题是%RETRY%+topicName,result为true表示回发成功
boolean result = this.sendMessageBack(msg, context);
//回发broker失败
if (!result) {
//设置msg的重复消费次数+1
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
//将回发broker失败的消息放入msgBackFailed
msgBackFailed.add(msg);
}
}
//如果msgBackFailed不为空
if (!msgBackFailed.isEmpty()) {
//从消费的消息里移除所有回发broker失败的消息
consumeRequest.getMsgs().removeAll(msgBackFailed);
//回发失败的 使用定时任务5秒以后再重试一次
this.submitConsumeRequestLater(msgBackFailed,
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//把消费成功的消息从ProcessQueue移除,并返回该批消息的最小offset
//返回processQueue上第一个消息的offset
long offset = consumeRequest
.getProcessQueue()
.removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//更新消费偏移量
//注意集群消费模式RemoteBrokerOffsetStore#updateOffset
//注意广播消费模式LocalFileOffsetStore#updateOffset
this.defaultMQPushConsumerImpl
.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
代码加的很清晰了,而且对于批量消费失败消费从消费失败位置如何进行回发也说明了。
对于单条消费且消费成功,ackIndex=0,那么i=1开始,则不进入for循环。
对于批量消费且消费成功,ackIndex=消费条数,那么i从消费的消息条数开始,因此也不进入for循环。
因此只要消费成功,无论单条消费or批量消费,都不进入for循环。
对于单条消费且消费失败,ackIndex=-1,那么i=0开始,则进入for循环,循环一次。
对于批量消费且消费失败,ackIndex=-1,那么i=0开始,则进入for循环,循环次数为消息的总条数。
因此只要消费失败,无论单条消费or批量消费,都会进入for循环。
问题:对于批量消费,比如32条,那么消费到第32条的时候消费失败了。那么这次消费的消息要全部回发到broker,然后消费端又重新消费了前面31条,这样是不好的。可否能从消费失败处回发呢?
可以的,在业务代码内设置ConsumeConcurrentlyContext.setAckIndex(int)即可
设置为消费失败的位置,这样消费失败就会从消费失败的消息位置进行回发到broker继而被消费端消费,就避免了批量消费重复消费成功的消息。也就是说在批量消费中如果设置了ConsumeConcurrentlyContext.ackIndex,那么就会从失败处开始重复消费,而非从该批消息的起始位置重新开始消费。
参考链接:blog.csdn.net/yulewo123/a…
removceMessage方法返回pq.msgTreeMap上保存的消息最小的offset,然后在updateOffset操作内把offset保存到消费客户端RemoteBrokerOffsetStore.offsetTable,消费客户端记录的消费offset在线程Thread [MQClientFactoryScheduledThread]每5s执行MQClientInstance.persistAllConsumerOffset()内保存到broker,发送命令是UPDATE_CONSUMER_OFFSET。
ProcessQueue#removeMessage
该方法有些难理解,重点是代码@1、代码@2处,对于每次消费成功,从pq移除该消息,如果pq还有消息(多个消费线程消费同一个ProcessQueue.msgTreeMap集合上保存的消息),那么返回最小的offset,如果pq上没有待消费的消息了,则返回ProcessQueue.queueOffsetMax(该属性保存的是一次拉取到的消息中的max offset),这样既避免了消息遗漏的情况,最终又保存到了消费最大offset的情况。
因此完美解决了 对于并发消费,消费msg1,msg2,msg3,它们的offset依次是增加的,在消费成功后,msg3先被消费完,继而保存offset的时候还是保存的msg1的offset,而非msg3.offset,这样避免了消费时候消息遗漏问题,但是会导致有重复消费的可能,当然rmq并不保证重复消费,由业务保证。
参考链接:blog.csdn.net/yulewo123/a…
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
//加写锁,因为要对红黑树进行写操作
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
//msgTreeMap是每次pull到消息后保存的本次pull到的消息(默认一次拉取32条消息)
if (!msgTreeMap.isEmpty()) {
//代码@2 this.queueOffsetMax保存的是拉取到的32条消息中offset最大的,在
//ProcessQueue.putMessage(List<MessageExt>)设置,
//该方法是拉取到消息后在PullCallback内调用
result = this.queueOffsetMax + 1;//代码@2
int removedCnt = 0;
//遍历本次消费的消息集合,通常是一个消息,因为默认一次消费消费一个消息
for (MessageExt msg : msgs) {
//从红黑树移除被消费的消息
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
//说明被消费的消息在pq内
//计数器
if (prev != null) {
removedCnt--;
//pq.msgSize减去本次消费的消息size
msgSize.addAndGet(0 - msg.getBody().length);
}
}
//pq.msgCount消息数量减去被消费的消息数量
msgCount.addAndGet(removedCnt);
//pq上还有消息,说明拉取到的32条消息还没被消费完
//则返回拉取到的消息集合第一个消息的offset
//即最小offset。这也说明了为什么用红黑树保存拉取到的消息了,
//按照消息的offset排序,这次消费一条消息,返回最小的offset,
//这样避免了消息丢失(offset大的先被执行消费完毕)
if (!msgTreeMap.isEmpty()) {
//代码@1 返回pq上第一个消息的offset,即最小offset
result = msgTreeMap.firstKey();
}
}
} finally {
//finally 释放锁
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}
return result;
}
参考链接:blog.csdn.net/yulewo123/a…
ConsumeMessageOrderlyService#submitConsumeRequest
rocketmq的顺序消息并不是严格的顺序,只是分区顺序,把一个生产者产生的消息按照消息产生顺序存放到同一个mq上,那么这样就涉及到发送的时候对待存放的消息队列的选择了,因此需要实现MessageQueueSelector来选择要发送的消息队列,其他发送同普通消息发送。
顺序消息的定义:help.aliyun.com/document_de…
顺序消费的启动和并发消费的启动基本相同,在前面的图已经画出来了,顺序消费ConsumeMessageOrderlyService,task是ConsumeMessageOrderlyService.ConsumeRequest,顺序消费主要是要对消费的mq进行加锁,重新负载后还要对mq解锁。
在PullMessageService服务线程拉取到消息后,执行PullCallback.onSuccess()时同并发消费一样把拉取到的消息保存到ProcessQueue.msgTreeMap。
而后在ConsumeMessageOrderlyService.submitConsumeRequest(List<MessageExt>, ProcessQueue, MessageQueue, boolean)内把processQueue, messageQueue包装创建为task。
ConsumeMessageOrderlyService.ConsumeRequest丢入到顺序消费线程池(min20 max64)处理,顺序消费一个messagequeue只会在一个work线程上执行,因此一个消费客户端对于顺序消费是串行执行,不存在并发。
参考链接:blog.csdn.net/yulewo123/a…
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
ConsumeMessageOrderlyService.ConsumeRequest#run
顺序消费,消费失败的时候是被阻塞的,消费失败后,然后当前在运行的task就退出,消息又重新被保存到pq,新创建ConsumeRequest提交到线程池,默认延时1s后再次消费,这个延时时间可以业务代码内调整。
总结:顺序消费在业务代码要设置最大失败消费次数,达到这个次数,把消息保存到db,而后要返回消费成功,这样避免了消息回发到broker到死信队列,这样做比较方便。
以上是一个MessageQueue的消费情况,那么一个消费客户端对应消费多个mq呢?
PullMessageService拉取消息是按照PullRequest来拉取的,一个PullRequest表示一个消息队列mq,那么在一个消费端被分配了多个mq的时候,每个mq拉取到的消息都会丢入到线程池处理(无论并发消费or顺序消费都默认是20~64个线程),并发消费是多个消费线程一起执行。
这个容易理解,但是顺序消费必须要串行执行,是如何做的呢?
答案就在上面分析的ConsumeMessageOrderlyService.ConsumeRequest.run()代码的for循环上,顺序消费和并发消费不同的是并发消费的task是ConsumeMessageConcurrentlyService.ConsumeRequest,它包装了待消费的消息,因此可以在线程池中并发执行。
但是ConsumeMessageOrderlyService.ConsumeRequest是不包含待消费的消息,而是在运行过程中从processqueue上拉取消息然后进行消费,消费完毕后,接着再进行拉取消息,因此虽然顺序消费的线程池的work线程是多个,但是实际上一个mq的消费只会同时只有线程池中的一个work线程执行,因此做到了顺序消费是串行的。
ConsumeRequest#run
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("message queue not be able to consume, because it's dropped");
return;
}
//根据messageQueue获取锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
/*
this.processQueue.isLocked()被加锁了且锁时间未失效!this.processQueue.isLockExpired()
pq加锁并设置时间戳是在负载服务线程内设置的,还有是在计划任务
ConsumeMessageOrderlyService.lockMQPeriodically()设置
广播消费模式进入执行,or 集群消费模式且pq在有效加锁时间内进入
*/
//广播模式顺序消费哦
if(MessageModel.BROADCASTING.equals(
ConsumeMessageOrderlyService
.this
.defaultMQPushConsumerImpl
.messageModel())||
//pq失效,则退出for循环,不做消费
(this.processQueue.isLocked() && !this
.processQueue
.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not
be able to consume,because it's dropped");
break;
}
if (MessageModel.CLUSTERING
.equals(ConsumeMessageOrderlyService
.this
.defaultMQPushConsumerImpl
.messageModel())&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}");
ConsumeMessageOrderlyService
.this
.tryLockLaterAndReconsume(this.messageQueue,
this.processQueue, 10);
break;
}
//集群模式顺序消费 锁过期了
if (MessageModel.CLUSTERING.equals
(ConsumeMessageOrderlyService
.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn
("the message queue lock expired, so consume later");
ConsumeMessageOrderlyService
.this.tryLockLaterAndReconsume
(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService
.this.submitConsumeRequestLater
(processQueue, messageQueue, 10);
break;
}
////默认1,可以设置一次消费的消息数量
final int consumeBatchSize =
ConsumeMessageOrderlyService
.this
.defaultMQPushConsumer
.getConsumeMessageBatchMaxSize();
//从pq.msgTreeMap上移除offset最小的consumeBatchSize条消息返回
//(默认返回一个消息),同时把这些消息保存到
//pq.consumingMsgOrderlyTreeMap这个红黑树上
List<MessageExt> List<MessageExt> msgs =
this.processQueue
.takeMessags(consumeBatchSize);
defaultMQPushConsumerImpl
.resetRetryAndNamespace
(msgs, defaultMQPushConsumer
.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context
= new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this
.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService
.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace
(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps
(new HashMap<String, String>());
ConsumeMessageOrderlyService.this
.defaultMQPushConsumerImpl
.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//pq的重入锁加锁,保证只有一个线程可以消费该pq上的该消息
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
break;
}
//业务代码执行消费消息
//业务消费逻辑
status =messageListener
.consumeMessage
(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| status==ConsumeOrderlyStatus
.SUSPEND_CURRENT_QUEUE_A_MOMENT ){
}
long consumeRT =
System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >=
defaultMQPushConsumer
.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus
.SUSPEND_CURRENT_QUEUE_A_MOMENT == status){
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this
.defaultMQPushConsumerImpl.hasHook()){
consumeMessageContext.getProps()
.put(MixAll.CONSUME_CONTEXT_TYPE,
returnType.name());
}
//消费返回null 稍后在消费
if (null == status) {
status = ConsumeOrderlyStatus
.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
//消费成功 执行hook
if (ConsumeMessageOrderlyService.this
.defaultMQPushConsumerImpl.hasHook()){
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService
.this
.defaultMQPushConsumerImpl
.executeHookAfter(consumeMessageContext);
}
//累加消费时间
ConsumeMessageOrderlyService
.this
.getConsumerStatsManager()
.incConsumeRT
(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//点进去看
//点进去看
//点进去看
//处理消费的结果
//如果更新offset成功则继续从pq拉取消息消费(继续执行for循环)
//这一个顺序消费线程就消费完了pull到的所有消息
//返回的标志是 是否继续消费
continueConsume = ConsumeMessageOrderlyService
.this
.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
//顺序消费
} else {
if (this.processQueue.isDropped()) {
return;
}
//尝试加锁 加锁失败 会重新加锁然后 直到成功
ConsumeMessageOrderlyService
.this
.tryLockLaterAndReconsume
(this.messageQueue, this.processQueue, 100);
}
}
}
tryLockLaterAndReconsume
public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue,
final long delayMills) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
//向远程的broker上的mq队列加锁
boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
//加锁成功
//立刻发起请求
if (lockOK) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater
(processQueue, mq, 10);
} else {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater
(processQueue, mq,3000);
}
}
}, delayMills, TimeUnit.MILLISECONDS);
}
public synchronized boolean lockOneMQ(final MessageQueue mq) {
if (!this.stopped) {
return this.defaultMQPushConsumerImpl.getRebalanceImpl().lock(mq);
}
return false;
}
public boolean lock(final MessageQueue mq) {
//获取指定brokerNamde的broker对应的master节点
FindBrokerResult findBrokerResult = this.mQClientFactory
.findBrokerAddressInSubscribe
(mq.getBrokerName(), MixAll.MASTER_ID, true);
//
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
//设置消费组
requestBody.setConsumerGroup(this.consumerGroup);
//设置clientId
requestBody.setClientId(this.mQClientFactory.getClientId());
//设置加锁的mq
requestBody.getMqSet().add(mq);
try {
//向远程broker发起加锁请求
Set<MessageQueue> lockedMq =
this.mQClientFactory
.getMQClientAPIImpl()
.lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
//判断响应的被加锁的mq里是否包含当前mq
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
//包含说明加锁成功
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
服务器处理加锁请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC:
return this.updateAndCreateTopic(ctx, request);
case RequestCode.DELETE_TOPIC_IN_BROKER:
return this.deleteTopic(ctx, request);
case RequestCode.GET_ALL_TOPIC_CONFIG:
return this.getAllTopicConfig(ctx, request);
case RequestCode.UPDATE_BROKER_CONFIG:
return this.updateBrokerConfig(ctx, request);
case RequestCode.GET_BROKER_CONFIG:
return this.getBrokerConfig(ctx, request);
case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
return this.searchOffsetByTimestamp(ctx, request);
case RequestCode.GET_MAX_OFFSET:
return this.getMaxOffset(ctx, request);
case RequestCode.GET_MIN_OFFSET:
return this.getMinOffset(ctx, request);
case RequestCode.GET_EARLIEST_MSG_STORETIME:
return this.getEarliestMsgStoretime(ctx, request);
case RequestCode.GET_BROKER_RUNTIME_INFO:
return this.getBrokerRuntimeInfo(ctx, request);
case RequestCode.LOCK_BATCH_MQ:
return this.lockBatchMQ(ctx, request);
case RequestCode.UNLOCK_BATCH_MQ:
return this.unlockBatchMQ(ctx, request);
case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP:
return this.updateAndCreateSubscriptionGroup(ctx, request);
case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG:
return this.getAllSubscriptionGroup(ctx, request);
case RequestCode.DELETE_SUBSCRIPTIONGROUP:
return this.deleteSubscriptionGroup(ctx, request);
case RequestCode.GET_TOPIC_STATS_INFO:
return this.getTopicStatsInfo(ctx, request);
case RequestCode.GET_CONSUMER_CONNECTION_LIST:
return this.getConsumerConnectionList(ctx, request);
case RequestCode.GET_PRODUCER_CONNECTION_LIST:
return this.getProducerConnectionList(ctx, request);
case RequestCode.GET_CONSUME_STATS:
return this.getConsumeStats(ctx, request);
case RequestCode.GET_ALL_CONSUMER_OFFSET:
return this.getAllConsumerOffset(ctx, request);
case RequestCode.GET_ALL_DELAY_OFFSET:
return this.getAllDelayOffset(ctx, request);
case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS:
return this.getConsumerStatus(ctx, request);
case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO:
return this.queryTopicConsumeByWho(ctx, request);
case RequestCode.REGISTER_FILTER_SERVER:
return this.registerFilterServer(ctx, request);
case RequestCode.QUERY_CONSUME_TIME_SPAN:
return this.queryConsumeTimeSpan(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
return this.getSystemTopicListFromBroker(ctx, request);
case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE:
return this.cleanExpiredConsumeQueue();
case RequestCode.CLEAN_UNUSED_TOPIC:
return this.cleanUnusedTopic();
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
case RequestCode.QUERY_CORRECTION_OFFSET:
return this.queryCorrectionOffset(ctx, request);
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.CLONE_GROUP_OFFSET:
return this.cloneGroupOffset(ctx, request);
case RequestCode.VIEW_BROKER_STATS_DATA:
return ViewBrokerStatsData(ctx, request);
case RequestCode.GET_BROKER_CONSUME_STATS:
return fetchAllConsumeStatsInBroker(ctx, request);
case RequestCode.QUERY_CONSUME_QUEUE:
return queryConsumeQueue(ctx, request);
default:
break;
}
return null;
}
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
requestBody.getConsumerGroup(),
requestBody.getMqSet(),
requestBody.getClientId());
LockBatchResponseBody responseBody = new LockBatchResponseBody();
responseBody.setLockOKMQSet(lockOKMQSet);
response.setBody(responseBody.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
final String clientId) {
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
//假如2个实例对应的线程分别是线程1 线程2
//线程1 对 q1 q2 加锁成功 线程2对q3加锁成功
//对于线程1 lockedMqs = q1 q2 notLockedMqs = q3
//假如线程1 先获取到锁
for (MessageQueue mq : mqs) {
//判断是否之前加锁成功
//之前加锁成功的都放到lockedMqs
//之前没有枷锁的或者加锁失败的都放到notLockedMqs
if (this.isLocked(group, mq, clientId)) {
lockedMqs.add(mq);
} else {
notLockedMqs.add(mq);
}
}
//判断未加锁的不为空
if (!notLockedMqs.isEmpty()) {
try {
// private final Lock lock = new ReentrantLock();
// 防止多线程并发
this.lock.lockInterruptibly();
try {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue
= this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
for (MessageQueue mq : notLockedMqs) {
LockEntry lockEntry = groupValue.get(mq);
//线程1走到这里判断的肯定不是空
if (null == lockEntry) {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info("tryLockBatch, message queue not locked, I got it");
}
//判断是否是由线程1加的锁 结果是false
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
//获取加锁成功的clientId
//用来打日志
String oldClientId = lockEntry.getClientId();
//如果过期了 默认自己加锁成功
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
lockedMqs.add(mq);
continue;
}
log.warn(
"tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
}
} finally {
this.lock.unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
}
return lockedMqs;
}
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
//双层map
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (groupValue != null) {
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
//相当于是锁重入 判断如下
//boolean eq = this.clientId.equals(clientId);
//return eq && !this.isExpired();
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
//相当于设置了看门狗
//更新一下时间
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
20s执行一次锁定分配给自己的消息消费队列
代码:ConsumeMessageOrderlyService#start
public void start() {
//如果消息模式为集群模式,启动定时任务,默认每隔20s执行一次锁定分配给自己的消息消费队列
if (MessageModel.CLUSTERING.equals
(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
rocketmq的顺序消息并不是严格的顺序,只是分区顺序,把一个生产者产生的消息按照消息产生顺序存放到同一个mq上,那么这样就涉及到发送的时候对待存放的消息队列的选择了,因此需要实现MessageQueueSelector来选择要发送的消息队列,其他发送同普通消息发送。
顺序消费的启动和并发消费的启动基本相同,在前面的图已经画出来了,顺序消费ConsumeMessageOrderlyService,task是ConsumeMessageOrderlyService.ConsumeRequest,顺序消费主要是要对消费的mq进行加锁,重新负载后还要对mq解锁。
在PullMessageService服务线程拉取到消息后,执行PullCallback.onSuccess()时同并发消费一样把拉取到的消息保存到ProcessQueue.msgTreeMap。
然后在ConsumeMessageOrderlyService.submitConsumeRequest(List<MessageExt>, ProcessQueue, MessageQueue, boolean)
内把processQueue, messageQueue
包装创建为task
ConsumeMessageOrderlyService.ConsumeRequest丢入到顺序消费线程池(min20 max64)处理,顺序消费一个messagequeue只会在一个work线程上执行,因此一个消费客户端对于顺序消费是串行执行,不存在并发。
参考链接:blog.csdn.net/yulewo123/a…
顺序消费,消费失败的时候是被阻塞的,消费失败后,然后当前在运行的task就退出,消息又重新被保存到pq,新创建ConsumeRequest提交到线程池,默认延时1s后再次消费,这个延时时间可以业务代码内调整。
总结:顺序消费在业务代码要设置最大失败消费次数,达到这个次数,把消息保存到db,而后要返回消费成功,这样避免了消息回发到broker到死信队列,这样做比较方便。
以上是一个MessageQueue的消费情况,那么一个消费客户端对应消费多个mq呢?
PullMessageService拉取消息是按照PullRequest来拉取的,一个PullRequest表示一个消息队列mq,那么在一个消费端被分配了多个mq的时候,每个mq拉取到的消息都会丢入到线程池处理(无论并发消费or顺序消费都默认是20~64个线程),并发消费是多个消费线程一起执行。
这个容易理解,但是顺序消费必须要串行执行,是如何做的呢?
答案就在上面分析的ConsumeMessageOrderlyService.ConsumeRequest.run()代码的for循环上,顺序消费和并发消费不同的是并发消费的task是ConsumeMessageConcurrentlyService.ConsumeRequest,它包装了待消费的消息,因此可以在线程池中并发执行。
但是ConsumeMessageOrderlyService.ConsumeRequest是不包含待消费的消息,而是在运行过程中从processqueue上拉取消息然后进行消费,消费完毕后,接着再进行拉取消息,因此虽然顺序消费的线程池的work线程是多个,但是实际上一个mq的消费只会同时只有线程池中的一个work线程执行,因此做到了顺序消费是串行的。
至此顺序消费写完。