27.RocketMQ之消费者启动源码

消费者启动

消费者启动示例代码

package com.itheima.mq.rocketmq.base.producer;
​
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;
​
import java.util.List;
import java.util.concurrent.TimeUnit;
​
public class ConsumerAndProducer {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        //1.创建消息生产者producer,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3.启动producer
        producer.start();
​
        for (int i = 0; i < 1; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg
                    = new Message("tyrant", "tag", ("Hello World -" + i).getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            String msgId = result.getMsgId();
            MessageQueue messageQueue = result.getMessageQueue();
            System.out.println("结果:" + result );
            System.out.println("状态:" + status);
            System.out.println("messageQueue:" + messageQueue);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
​
        //6.关闭生产者producer
       // producer.shutdown();
​
​
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setConsumeMessageBatchMaxSize(10);
        consumer.setConsumeThreadMin(2);
        consumer.setConsumeThreadMax(60);
        consumer.setNamesrvAddr("localhost:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("tyrant", "tag");
​
        //设定消费模式:负载均衡|广播模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
​
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
​
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(msgs.size());
                for (MessageExt msg : msgs) {
                    msg.setReconsumeTimes(3);
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                //可以设置消费失败的位置
                // context.setAckIndex();
                return null;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}
​

消费者启动流程

消费者构造&初始化

1.指定消费者组,负载均衡策略,消费模式

 //1.创建消费者Consumer,制定消费者组名
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 public DefaultMQPushConsumer(String consumerGroup) {
     	//可以看到默认的负载均衡策略是平均
        this(consumerGroup, (RPCHook)null, new AllocateMessageQueueAveragely());
    }
   public DefaultMQPushConsumer(String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.log = ClientLogger.getLog();
       	//默认集群策略
        this.messageModel = MessageModel.CLUSTERING;
        this.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
        this.consumeTimestamp = 	
            UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1800000L);
        this.subscription = new HashMap();
        this.consumeThreadMin = 20;
        this.consumeThreadMax = 64;
        this.adjustThreadPoolNumsThreshold = 100000L;
        this.consumeConcurrentlyMaxSpan = 2000;
        this.pullThresholdForQueue = 1000;
        this.pullThresholdSizeForQueue = 100;
        this.pullThresholdForTopic = -1;
        this.pullThresholdSizeForTopic = -1;
        this.pullInterval = 0L;
        this.consumeMessageBatchMaxSize = 1;
        this.pullBatchSize = 32;
        this.postSubscriptionWhenPull = false;
        this.unitMode = false;
        this.maxReconsumeTimes = -1;
        this.suspendCurrentQueueTimeMillis = 1000L;
        this.consumeTimeout = 15L;
        this.traceDispatcher = null;
        this.consumerGroup = consumerGroup;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }

2.消费者订阅主题,设置消息过滤tag

//3.订阅主题Topic和Tag
consumer.subscribe("tyrant", "tag");
public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
    }
    public void subscribe(String topic, String subExpression) throws MQClientException {
            try {

                //解析subExpression封装为subscriptionData
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData
                    			(this.defaultMQPushConsumer.getConsumerGroup(),
                                 topic, subExpression);
                //将解析完成的subscriptionData放入subscriptionInner
                //注意subscriptionInner 后面还会用到
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                if (this.mQClientFactory != null) {
                    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                }


            } catch (Exception var4) {
                throw new MQClientException("subscription exception", var4);
            }
        }

消费者启动流程

image.png

消费端启动和producer启动很类似,可以和producer启动进行对比。

不同之处是消费端的PullMessageService、RebalanceService才有真正作用,而producer该两个服务线程是无用的,这两个服务线程也是消费端的核心。

     public DefaultMQPushConsumer(final String namespace, final String consumerGroup, 
     RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
            this.consumerGroup = consumerGroup;
            this.namespace = namespace;
            this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
            //注意这里默认构造的是defaultMQPushConsumerImpl
            defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
        }


    //this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
    //向defaultMQPushConsumerImpl的rebalanceImpl的subscriptionInner放入数据 
    //key是topic value是subscriptionData
    //"tyrant" -> "SubscriptionData [classFilterMode=false, topic=tyrant, subString=tag, //tagsSet=[tag], codeSet=[114586], subVersion=1617670290656, expressionType=TAG]"
    public void subscribe(String topic, String subExpression) throws MQClientException {
            try {
                SubscriptionData subscriptionData
                    		= FilterAPI.buildSubscriptionData
                    				(this.defaultMQPushConsumer.getConsumerGroup(),
                    topic, subExpression);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                if (this.mQClientFactory != null) {
                    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                }
            } catch (Exception e) {
                throw new MQClientException("subscription exception", e);
            }
        }

    /***
    可以使用该方法指定多个topic 和 对应的tag。只不过这里设置的subscribtion,在copySubscription中也会执行向defaultMQPushConsumerImpl的rebalanceImpl的subscriptionInner放入数据的操作
    **/
    public void setSubscription(Map<String, String> subscription) {
            Map<String, String> subscriptionWithNamespace = new HashMap<String, String>();
            for (String topic : subscription.keySet()) {
                subscriptionWithNamespace
                    	.put(withNamespace(topic), subscription.get(topic));
            }

            this.subscription = subscriptionWithNamespace;
        }

DefaultMQPushConsumerImpl#start

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

    //注意这里的start方法和
    //注意这里是个true 代表首先要启动mQClientFactory
    public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:

                    this.serviceState = ServiceState.START_FAILED;
    				//检查消费组命名规范 校验如下:
                    //不能为空
                    //最大长度不能超过255
                    //不能叫 DEFAULT_CONSUMER
                    //正则校验: ^[%|a-zA-Z0-9_-]+$
                    this.checkConfig();
    				
                    //1.构建topic和tag(subscription)之间的映射关系
                    //2.设置消息监听器
                    //#进入copySubscription
                    this.copySubscription();
    				
                    //如果是集群模式
                    //更改当前instanceName为进程ID
                    //主要是通过UtilAll.getPid()
                    //UtilAll.getPid() 则是调用
                    //ManagementFactory.getRuntimeMXBean().getName()
                    //name的格式是 pid@hostname
                    //提取  pid@hostname 中的 pid
                    if (this.defaultMQPushConsumer.getMessageModel() 
                        == MessageModel.CLUSTERING) {
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }
                    //MQClientManager使用了单例模式通过MQClientManager.getInstance()返回单例
                    //MQClientManager.getInstance()返回的是MQClientManager
                    //getAndCreateMQClientInstance()返回的是MQ客户端实例 MQClientInstance


                    // 1.首先拼接 IP + @ + PID 如172.18.95.215@5444 作为clientId
                    // 2.MQClientInstance instance = this.factoryTable.get(clientId);
                    // 3.如果为空则创建MQClientInstance
                    // 4.放入:this.factoryTable.putIfAbsent(clientId, instance);	
                    // 这样可以保证同一个JVM用的是一个MQClientInstance,
                    // 下次在启动MQClientInstance会判断状态是RUNNING,会直接break。跳出switch。
                    this.mQClientFactory
                        				= MQClientManager
                        					.getInstance()
                        					.getAndCreateMQClientInstance
                       						(this.defaultMQPushConsumer, this.rpcHook);

                 	 //注意这里都是this.rebalanceImpl在设置属性
                  	//设置消费组  
                 	this.rebalanceImpl
                     .setConsumerGroup
                     (this.defaultMQPushConsumer.getConsumerGroup());
                    //设置消费模式  
                 	this.rebalanceImpl
                     .setMessageModel
                     (this.defaultMQPushConsumer.getMessageModel());
                     //设置分配消息的策略 默认是AllocateMessageQueueAveragely
                 	this.rebalanceImpl
                     .setAllocateMessageQueueStrategy
                     (this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    //设置MQ客户端实例    
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    				
                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                    this.pullAPIWrapper
                        .registerFilterMessageHook(filterMessageHookList);
    				
                    //如果不是 第一次启动 首先获取offsetStore 
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            //消息消费广播模式,将消费进度保存在本地    
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore
                                    (this.mQClientFactory, 		                
                                     this.defaultMQPushConsumer
                                     	 .getConsumerGroup());
                                break;
                           //消息消费集群模式,将消费进度保存在远端Broker
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore
                                    (this.mQClientFactory, this.defaultMQPushConsumer
                                     .getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                        this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                    }
                    //加载offsetStore
                    this.offsetStore.load();
    				//如果是顺序消费
          if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
              			//设置顺序消费标志
                        this.consumeOrderly = true;
              			//强转消费消息服务为MessageListenerOrderly
                        this.consumeMessageService 
                            =new ConsumeMessageOrderlyService
                           		 (this, (MessageListenerOrderly) 
                             				this.getMessageListenerInner());
                        
           } else if (this.getMessageListenerInner() instanceof 
                      					MessageListenerConcurrently){ 
             //设置并发消费标志           
             this.consumeOrderly = false;
              //强转消费消息服务为MessageListenerConcurrently
             this.consumeMessageService
                 =new ConsumeMessageConcurrentlyService
                       (this, (MessageListenerConcurrently) 
                        this.getMessageListenerInner());
           }
          //启动消费消息服务
          //consumeMessageService.start()
    	  this.consumeMessageService.start();
         boolean registerOK = mQClientFactory.registerConsumer
             (this.defaultMQPushConsumer.getConsumerGroup(), this);
          if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        this.consumeMessageService.shutdown();
                        throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
                    //这里会调用
                    //org.apache.rocketmq.client.impl.factory.MQClientInstance#start
                    //启动很多服务
                    //当MQClientInstance启动完毕会调用
                    //this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    //完成DefaultMQProducerImpl的启动
                    mQClientFactory.start();
        
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }

            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            this.mQClientFactory.checkClientInBroker();
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            this.mQClientFactory.rebalanceImmediately();
        }

3.订阅重试主题名称:%RETRY%+消费组名

只是订阅,但是重试主题还没有创建。只有存在需要重试的消息的时候才会创建重试的主题。

     private void copySubscription() throws MQClientException {
            try {

                // Map<String /* topic */, String /* sub expression */> subscription
                // subscription: key是topic value是tag
                Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
                //这里的sub不是空 但是size = 0 
                //可以直接跳过
                if (sub != null) {
                    for (final Map.Entry<String, String> entry : sub.entrySet()) {
                        final String topic = entry.getKey();
                        final String subString = entry.getValue();
                        //构建subscriptionData
                        SubscriptionData subscriptionData 
                            	= FilterAPI.buildSubscriptionData
                            		(this.defaultMQPushConsumer.getConsumerGroup(),
                            topic, subString);
                        //放入Map key是topic value是subscriptionData
                        this.rebalanceImpl.getSubscriptionInner()
                            			.put(topic, subscriptionData);
                    }
                }
    			
                //如果是空 设置监听器 
                if (null == this.messageListenerInner) {
                    this.messageListenerInner = this
                        						.defaultMQPushConsumer
                        						.getMessageListener();
                }

    			
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    //如果是广播模式    广播模式不会重试
                    case BROADCASTING:
                        break;


                        //如果是集群模式
                    case CLUSTERING:
                        //构建重试组topic名称  %RETRY% + 消费组名称 注意是消费组名
                        //如果我的消费组是 group1
                        //那么重试topic名称 %RETRY%group1
                        final String retryTopic = MixAll.getRetryTopic
                            (this.defaultMQPushConsumer.getConsumerGroup());
                        //构建重试主体的subscriptionData
                        //SubscriptionData.SUB_ALL 值是 *即订阅重试主题的所有消息
                        //解析表达式封装为 subscriptionData 
                        SubscriptionData subscriptionData 
                            	= FilterAPI.buildSubscriptionData
                            		(this.defaultMQPushConsumer.getConsumerGroup(),
                           						retryTopic, SubscriptionData.SUB_ALL);
                        //向defaultMQPushConsumerImpl的rebalanceImpl
                        //的subscriptionInner放入数据 
                        //在订阅原始主题的时候我们也往这里放入过数据
                        this.rebalanceImpl.getSubscriptionInner()
                            .put(retryTopic, subscriptionData);
                        break;
                    default:
                        break;
                }
            } catch (Exception e) {
                throw new MQClientException("subscription exception", e);
            }

        }

4.创建MQClientInstance,设置负载均衡服务的属性

    			//MQClientManager使用了单例模式通过MQClientManager.getInstance()返回单例
                    //MQClientManager.getInstance()返回的是MQClientManager
    				//MQClientManager也是单例1个jvm只有1个
                    //getAndCreateMQClientInstance()返回的是MQ客户端实例 MQClientInstance

                    // 1.首先拼接 IP + @ + PID 如172.18.95.215@5444 作为clientId
                    // 2.MQClientInstance instance = this.factoryTable.get(clientId);
                    // 3.如果为空则创建MQClientInstance
                    // 4.放入:this.factoryTable.putIfAbsent(clientId, instance);	
                    // 这样可以保证同一个JVM用的是一个MQClientInstance
    				// 也就是一个JVM进程分别启动生产者和消费者,使用的是一个MQClientInstance
                    // 下次在启动MQClientInstance会判断状态是RUNNING,会直接break。跳出switch。
                    this.mQClientFactory

                        				= MQClientManager
                        					.getInstance()
                        					.getAndCreateMQClientInstance
                       						(this.defaultMQPushConsumer, this.rpcHook);
                 	 //注意这里都是this.rebalanceImpl在设置属性
                  	//设置消费组  
                 	this.rebalanceImpl
                     .setConsumerGroup
                     (this.defaultMQPushConsumer.getConsumerGroup());
                    //设置消费模式  
                 	this.rebalanceImpl
                     .setMessageModel
                     (this.defaultMQPushConsumer.getMessageModel());
                     //设置分配消息的策略 默认是AllocateMessageQueueAveragely
                 	this.rebalanceImpl
                     .setAllocateMessageQueueStrategy
                     (this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    //设置MQ客户端实例    
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

5.获取offsetStore,加载消费进度

广播:本地offsetStore。

比如:C:\Users\彭方亮.rocketmq_offsets

集群:远程offsetStore。

比如:D:\Program Files\rocketmq-all-4.3.0-bin-release\bin\store\config\consumerOffset.json

    {
    	"offsetTable":{
    		"%RETRY%group1@group1":{0:238
    		},
            //意思是topic是tyrant
            //消费者组是group1
            //消费的进度是 queueId为0的消费到了9 1消费到了7 
            //这里是下标 因为是定长20字节
    		"tyrant@group1":{0:9,1:7,2:6,3:8,4:8,5:6,6:6,7:7
    		}
    	}
    }

6.根据监听器创建并发消费服务或者顺序消费服务

消息服务: ConsumeMessageOrderlyService&ConsumeMessageConcurrentlyService

默认其实就是并发消费

    				//如果不是 第一次启动 首先获取offsetStore实例
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.faultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            //消息消费广播模式,将消费进度保存在本地    
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore
                                    (this.mQClientFactory, 		                
                                     this.defaultMQPushConsumer
                                     	 .getConsumerGroup());
                                break;
                           //消息消费集群模式,将消费进度保存在远端Broker
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore
                                    (this.mQClientFactory, this.defaultMQPushConsumer
                                     .getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                        this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                    }
                    //加载offsetStore
    				//本地是
    				//System.getProperty("user.home") + File.separator + 
    				//											   ".rocketmq_offsets");
    				//远程是
    				// return rootDir + File.separator + "config" + File.separator + 
    				//											   "consumerOffset.json";
                    //远程不在这里加载
    				this.offsetStore.load();
    	  //如果是顺序消费
          if (this.getMessageListenerInner() 
              			instanceof MessageListenerOrderly) {
              			//设置顺序消费标志
                        this.consumeOrderly = true;
              			//强转消费消息服务为MessageListenerOrderly
                        this.consumeMessageService
                            =new ConsumeMessageOrderlyService
                            (this, (MessageListenerOrderly) 
                             		this.getMessageListenerInner());
                        
           } else if (this.getMessageListenerInner() 
                      instanceof MessageListenerConcurrently){ 
             //设置并发消费标志           
             this.consumeOrderly = false;
              //强转消费消息服务为MessageListenerConcurrently
             this.consumeMessageService 
                 		=new ConsumeMessageConcurrentlyService
                       				(this, (MessageListenerConcurrently) 
                                     		this.getMessageListenerInner());
           }

consumeMessageService.start

因为消费方式分为并发消费和顺序消费,所以这里根据消费方式的不同选择的实现类也不同。

下面的消费方式是并发消费。

下面的消费方式是并发消费。

下面的消费方式是并发消费。

ConsumeMessageConcurrentlyService.start

15分钟执行一次:清理超时消息并发回broker★
    // private long consumeTimeout = 15 分钟
    // 15分钟执行一次清理过期消息任务
    public void start() {
            this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    cleanExpireMsg();
                }
            }, 
            this.defaultMQPushConsumer.getConsumeTimeout(), 		    
            this.defaultMQPushConsumer.getConsumeTimeout(), 
            TimeUnit.MINUTES);
     }
ConsumeMessageConcurrentlyService.cleanExpireMsg
    private void cleanExpireMsg() {
            Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
    			this.defaultMQPushConsumerImpl
                .getRebalanceImpl()
                .getProcessQueueTable()
                .entrySet()
                .iterator();
        	//遍历ProcessQueue
            while (it.hasNext()) {
                Map.Entry<MessageQueue, ProcessQueue> next = it.next();
                ProcessQueue pq = next.getValue();
                 //清理ProcessQueue中的过期数据
                pq.cleanExpiredMsg(this.defaultMQPushConsumer);
            }
        }


    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
        	//如果是顺序消费直接返回 不需要清理卡住进度的消息
        	//这里的值是在start方法中判断ConsumeMessageOrderlyService后设置为true
            if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
                return;
            }
    		//每次最多取16个消息
            int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
            for (int i = 0; i < loop; i++) {
                MessageExt msg = null;
                try {
                    this.lockTreeMap.readLock().lockInterruptibly();
                    try {
                        //如果msgTreeMap中还有元素
                        //且当前时间减去-第一个消息开始消费的时间大于15分钟
                        if (!msgTreeMap.isEmpty() && 
      						System.currentTimeMillis() - Long.parseLong
                            	(MessageAccessor.getConsumeStartTimeStamp
                             			(msgTreeMap.firstEntry().getValue()))> 
                     pushConsumer.getConsumeTimeout() * 60 * 1000) {
                            msg = msgTreeMap.firstEntry().getValue();
                        } else {
                            break;
                        }
                    } finally {
                        this.lockTreeMap.readLock().unlock();
                    }
                } catch (InterruptedException e) {
                    log.error("getExpiredMsg exception", e);
                }


                try {
                    //把消息重新发送给broker,设置延迟级别=3
                    pushConsumer.sendMessageBack(msg,3);
                    try {
                        this.lockTreeMap.writeLock().lockInterruptibly();
                        try {
                            if (!msgTreeMap.isEmpty() && 
                                msg.getQueueOffset() == msgTreeMap.firstKey()) {
                                try {
                                    //移除此消息
                                    removeMessage(Collections.singletonList(msg));
                                } catch (Exception e) {
                                    log.error("send expired msg exception", e);
                                }
                            }
                        } finally {
                            this.lockTreeMap.writeLock().unlock();
                        }
                    } catch (InterruptedException e) {
                        log.error("getExpiredMsg exception", e);
                    }
                } catch (Exception e) {
                    log.error("send expired msg exception", e);
                }
            }
        }

思考一下:什么时候会出现消息消费超时?

保证消费成功

PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。消费的时候,我们需要注入一个消费回调,具体sample代码如下:

     consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                doMyJob();//执行真正消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息 (默认是1条) 是消费完成的。

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重新发回Broker。

发回的topic不是原topic而是这个消费组的RETRY topic,默认的延迟等级是3,即10秒,业务可设置后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

1.如果业务的回调抛出异常那么认为消费失败。当成ConsumeConcurrentlyStatus.RECONSUME_LATER处理。

2.当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

启动的时候从哪里消费

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。

如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

    //默认策略,从该队列最尾开始消费,即跳过历史消息
    CONSUME_FROM_LAST_OFFSET 
    //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
    CONSUME_FROM_FIRST_OFFSET 
    //从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
    CONSUME_FROM_TIMESTAMP

所以,社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。

对于老消费组想跳过历史消息可以采用以下3种方法:

1.代码按照日期判断,太老的消息直接return CONSUME_SUCCESS过滤。

2.代码判断消息的offset和MAX_OFFSET相差很远,认为是积压了很多,直接return CONSUME_SUCCESS过滤。

3.消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用命令resetOffsetByTime,或调用内部的运维接口,祥见ResetOffsetByTimeCommand.java

消息ACK机制

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。

但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:

image.png

这钟方式和传统的一条message单独ack的方式有本质的区别。在性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才会一下子更新到2200。

在这种设计下,就有消费大量重复的风险。

如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。

这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,3.2.6之前的RocketMQ无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

实际上,从源码的角度上看,RocketMQ可能是考虑过这个问题的,截止到3.2.6的版本的源码中,可以看到为了缓解这个问题的影响面,DefaultMQPushConsumer中有个配置consumeConcurrentlyMaxSpan

private int consumeConcurrentlyMaxSpan = 2000;

这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个阈值就不再拉取消息。

但作用实际很有限,像刚刚这个例子,2101的消费是死循环,其他消费非常正常的话,是无能为力的。一旦退出,在不人工干预的情况下,2101后所有消息全部重复。

Ack卡进度解决方案

对于卡消费进度的问题,最显而易见的解法是设定一个超时时间,达到超时时间的那个消费当作消费失败处理。

后来RocketMQ显然也发现了这个问题,而RocketMQ在3.5.8之后也就是采用这样的方案去解决这个问题。

1.在pushConsumer中 有一个consumeTimeout字段(默认15分钟),用于设置最大的消费超时时间。消费前会记录一个消费的开始时间,后面用于比对。

2.消费者启动的时候,会定期扫描所有消费的消息,达到这个timeout的那些消息,就会触发sendBack并ack的操作。这里扫描的间隔也是consumeTimeout(单位分钟)的间隔。

通过源码看这个方案,其实可以看出有几个不太完善的问题:

cleanExpireMsg();
  1. 消费timeout的时间非常不精确。由于扫描的间隔是15分钟,所以实际上触发的时候,消息是有可能卡住了接近30分钟(15*2)才被清理。
  2. 由于定时器一启动就开始调度了,中途这个consumeTimeout再更新也不会生效。
pushConsumer.sendMessageBack(msg, 3);
    参考链接:https://blog.csdn.net/ph3636/article/details/90484951

    public void sendMessageBack(MessageExt msg, int delayLevel)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
            msg.setTopic(withNamespace(msg.getTopic()));
            this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
    }
        
     public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) 	throws RemotingException, MQBrokerException, 
     		InterruptedException, MQClientException {
            try {
                String brokerAddr 
                    = (null != brokerName) ? 
                    this.mQClientFactory

                    	.findBrokerAddressInPublish(brokerName)
                    : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
                    //发送消息到原Topic
                this.mQClientFactory
                    .getMQClientAPIImpl()		
                    .consumerSendMessageBack(brokerAddr,msg,
                             this.defaultMQPushConsumer.getConsumerGroup(), 
                             delayLevel, 5000, getMaxReconsumeTimes());
                    
            } catch (Exception e) {
    			//当发送重试消息出现异常时 有可能是broker异常
                //这个时候采用正常的发送流程,重新创建一个新消息,
    			//把topic改成重试组的topic,保存以前的消息id。
    			//默认最大重试次数为16,所以一个消息最多可以消费17次,默认设置延迟级别范围为3到18。
                //为什么是16?因为18个等级,从3开始重试,18-3+1 =15
                //发送到重试消费Topic 是%RETRY% + 消费组名 注意是消费组名
                //我们思考一下为什么是消费者组名?
                //A消费者组消费成功 B消费者组消费失败
                //如果发回原topic就有问题了,A又会消费一次
                Message newMsg 
                    = new Message(MixAll.getRetryTopic(
                    		this.defaultMQPushConsumer.getConsumerGroup()), 
                       				msg.getBody());
    			//当然第一次重试是没有属性源消息id的,
    			//这个时候设置当前消息的id为源消息id,
                String originMsgId = MessageAccessor.getOriginMessageId(msg);
                MessageAccessor.setOriginMessageId
                   (newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
    			
                //将原真实Topic的属性赋值给重试topic
                newMsg.setFlag(msg.getFlag());
                MessageAccessor.setProperties(newMsg, msg.getProperties());
                MessageAccessor.putProperty
                    (newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
                //设置重试次数加1,重试次数默认是0
                MessageAccessor.setReconsumeTime
                    (newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
                MessageAccessor.setMaxReconsumeTimes
                    (newMsg, String.valueOf(getMaxReconsumeTimes()));
                //重试次数默认是0 即 3 + 0 为 DelayTimeLevel 即 10秒后再次重试
                //3-18 即 18-3+1 = 16 即重试16次
                newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
                //发送消息到重试Topic
                this.mQClientFactory.getDefaultMQProducer().send(newMsg);
            } finally {
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
            }

        }


        private int getMaxReconsumeTimes() {
            // default reconsume times: 16
            if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
                return 16;
            } else {
                return this.defaultMQPushConsumer.getMaxReconsumeTimes();
            }
        }
MQClientAPIImpl#consumerSendMessageBack
    //发送重试消息到原Topic,组装消息头类ConsumerSendMsgBackRequestHeader
    //请求码为CONSUMER_SEND_MSG_BACK = 36
    public void consumerSendMessageBack(
            final String addr,
            final MessageExt msg,
            final String consumerGroup,
            final int delayLevel,
            final long timeoutMillis,
            final int maxConsumeRetryTimes
        ) throws RemotingException, MQBrokerException, InterruptedException {
            ConsumerSendMsgBackRequestHeader requestHeader = 
            		new ConsumerSendMsgBackRequestHeader();
            RemotingCommand request = RemotingCommand
            		.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
            requestHeader.setGroup(consumerGroup);
            requestHeader.setOriginTopic(msg.getTopic());
            requestHeader.setOffset(msg.getCommitLogOffset());
            requestHeader.setDelayLevel(delayLevel);
            requestHeader.setOriginMsgId(msg.getMsgId());
            requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);


            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
                request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    return;
                }

                default:
                    break;
            }

            throw new MQBrokerException(response.getCode(), response.getRemark());
        }

#### ConsumeMessageOrderlyService#start

    //首次延迟1秒启动 此后每20秒执行一次 
    public void start() {
           if(MessageModel.CLUSTERING.equals(
           ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    }
                }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
            }
        }
ConsumeMessageOrderlyService#lockMQPeriodically
顺序消费要周期给自己分配到的队列加锁
     public synchronized void lockMQPeriodically() {
            if (!this.stopped) {
                this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
            }
        }
RebalanceImpl#lockAll
    /***
    通过ReblanceImp的lockAll方法,每隔一段时间定时锁住当前消费端正在消费的队列。
    设置本地队列ProcessQueue的locked属性为true。
    保证broker中的每个消息队列只对应一个消费端,防止重复消费
    ***/
    public void lockAll() {
         	/***
               从ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 中获取keySet
               keySet中的key是MessageQueue
               通过MessageQueue获取BrokerName 作为key,MessageQueue作为value放入
               HashMap<String brokerName, Set<MessageQueue>> 
            ***/
            HashMap<String, Set<MessageQueue>> brokerMqs 
                					=this.buildProcessQueueTableByBrokerName();
    		
            Iterator<Entry<String, Set<MessageQueue>>> it
                					= brokerMqs.entrySet().iterator();
        
        	//遍历brokerMqs 即 HashMap<String/* brokerName */, Set<MessageQueue>> 
            while (it.hasNext()) {
                Entry<String, Set<MessageQueue>> entry = it.next();
                final String brokerName = entry.getKey();
                final Set<MessageQueue> mqs = entry.getValue();

                if (mqs.isEmpty())
                    continue;
    			
                //获取brokerName对应的master节点
                FindBrokerResult findBrokerResult = this
                    .mQClientFactory
                    .findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
                
                //master节点不为空
                if (findBrokerResult != null) {
                    LockBatchRequestBody requestBody = new LockBatchRequestBody();
                    requestBody.setConsumerGroup(this.consumerGroup);
                    requestBody.setClientId(this.mQClientFactory.getClientId());
                    requestBody.setMqSet(mqs);


                    try {
                        //批量对MessageQueue加锁
                        Set<MessageQueue> lockOKMQSet =this
                          .mQClientFactory
                          .getMQClientAPIImpl()
                          .lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
    					//遍历加锁成功的MessageQueue
                        for (MessageQueue mq : lockOKMQSet) {
                            ProcessQueue processQueue = this.processQueueTable.get(mq);
                            if (processQueue != null) {
                                if (!processQueue.isLocked()) {
                           		log.info
                                    ("the message queue locked OK, Group: {} {}", 		                                                         this.consumerGroup, mq);
                                }
    							//给processQueue设置加锁状态为成功
                                processQueue.setLocked(true);
                                //设置上一次加锁时间
                                processQueue.setLastLockTimestamp
                                    			(System.currentTimeMillis());
                            }
                        }
                        //获取加锁失败的MessageQueue
                        for (MessageQueue mq : mqs) {
                            if (!lockOKMQSet.contains(mq)) {
                                ProcessQueue processQueue 
                                    	= this.processQueueTable.get(mq);
                                if (processQueue != null) {
                                    //设置加锁状态为失败
                                    processQueue.setLocked(false);
                                }
                            }
                        }
                    } catch (Exception e) {
                        log.error("lockBatchMQ exception, " + mqs, e);
                    }
                }
            }
        }	

7.启动MQClientInstance

MQClientInstance#start

    DefaultMQPullConsumerImpl.start()  (org.apache.rocketmq.client.impl.consumer)
    DefaultMQAdminExtImpl.start()  (org.apache.rocketmq.tools.admin)
    DefaultMQPushConsumerImpl.start()  (org.apache.rocketmq.client.impl.consumer)
    DefaultMQProducerImpl.start(boolean)  (org.apache.rocketmq.client.impl.producer)
        
    //消费端启动和producer启动很类似,可以和producer启动进行对比。
    //不同之处是消费端的PullMessageService、RebalanceService才有真正作用,而producer该两个服务线程是无用的,这两个服务线程也是消费端的核心。    
        
    //上面的类都调用的该方法 所以这方法不是只有生产者调用
    //这也解释了为什么producer启动时为什么启动了那么多服务
    //因为不管是消费者还是生产者都需要从nameServer拉取服务地址列表、发送心跳
    //因此启动时不管先启动消费者还是先启动生产者 这些操作都是必不可少的。
    //如果这样那么为什么不放到一起执行呢?
    public void start() throws MQClientException {

            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        //和远程服务打交道的实例
                        this.mQClientAPIImpl.start();
                       // 2分钟更新一次nameServer的地址,该地址要求返回结果为一个ip列表,以;隔开,					// 如果获取回来的地址跟现有的地址不一致则会更新缓存的NameServer地址列表。
                       // 30秒更新一次topic的路由信息
                       // 30秒对所有的Broker发送一次心跳检测,并将下线的broker删除
                       // 5秒持久化一次consumer的offset
                       // 1分钟调整一次线程池,这个定时任务其实什么都没有执行。
                        this.startScheduledTask();
                        // 启动拉取消息的线程
                        this.pullMessageService.start();
                        // 启动负载均衡服务 这里是一个Thread 所以看run方法
                        this.rebalanceService.start();
                        // Start push service
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case RUNNING:
                        break;
                    case SHUTDOWN_ALREADY:
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }

<!---->


    同一个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
    /* key 为 clientId 即ip + @ + pid */
    ConcurrentMap<String, MQClientInstance> factoryTable = 
    new ConcurrentHashMap<String,MQClientInstance>();

    同一个clientId只会创建一个MQClientInstance。MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道

this.startScheduledTask();

每隔2分钟更新一次nameServer的地址

该地址要求返回结果为一个ip列表,以;隔开。

如果获取回来的地址跟现有的地址不一致则会更新缓存的NameServer地址列表。

每隔30秒更新一次topic的路由信息
每隔30秒向所有的Broker发送1次心跳
每隔30秒将下线的broker删除
每隔5秒持久化一次消费进度

消费者:

    @Override

        public void persistConsumerOffset() {
            try {
                this.makeSureStateOK();
                Set<MessageQueue> mqs = new HashSet<MessageQueue>();
                Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
                mqs.addAll(allocateMq);

                this.offsetStore.persistAll(mqs);
            } catch (Exception e) {
                log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
            }
        }


<!---->


      @Override
        public void persistAll(Set<MessageQueue> mqs) {
            if (null == mqs || mqs.isEmpty())
                return;


            final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
            if (!mqs.isEmpty()) {
                //遍历mq 循环更新消息进度
                for (Map.Entry<MessageQueue, AtomicLong> entry : 
                     					this.offsetTable.entrySet()) {
                    MessageQueue mq = entry.getKey();
                    AtomicLong offset = entry.getValue();
                    if (offset != null) {
                        if (mqs.contains(mq)) {
                            try {
                                this.updateConsumeOffsetToBroker(mq, offset.get());
                        } else {
                            unusedMQ.add(mq);
                        }
                    }
                }
            }



            if (!unusedMQ.isEmpty()) {
                for (MessageQueue mq : unusedMQ) {
                    this.offsetTable.remove(mq);
                    log.info("remove unused mq, {}, {}", mq, this.groupName);
                }
            }
        }


broker:

    @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, 
                                           			RemotingCommand request)
            throws RemotingCommandException {
            switch (request.getCode()) {
                case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                    return this.getConsumerListByGroup(ctx, request);
                case RequestCode.UPDATE_CONSUMER_OFFSET:
                    return this.updateConsumerOffset(ctx, request);
                case RequestCode.QUERY_CONSUMER_OFFSET:
                    return this.queryConsumerOffset(ctx, request);
                default:
                    break;
            }
            return null;
        }

<!---->

    private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
            final UpdateConsumerOffsetRequestHeader requestHeader =
                (UpdateConsumerOffsetRequestHeader) request
                    .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
                requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

<!---->

     public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
            final long offset) {
            // 默认消费的进度的key是topic@group 
            String key = topic + TOPIC_GROUP_SEPARATOR + group;
            this.commitOffset(clientHost, key, queueId, offset);
        }

<!---->

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        	//现根据key:topic@group 获取到队列id对应的消费进度
        	//{0:1,1:1,2:1,3:1,4:1,5:2,6:1,7:1}
            ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        	//如果是空,说明原来没有创建过就新建1个
            if (null == map) {
                map = new ConcurrentHashMap<Integer, Long>(32);
                map.put(queueId, offset);
                this.offsetTable.put(key, map);
            } else {
                //如果不为空那么就把queueid作为key offset作为进度持久化
                Long storeOffset = map.put(queueId, offset);
                if (storeOffset != null && offset < storeOffset) {
                    log.warn("update consumer offset less than store.");
                }
            }
        }

<!---->

    //tyrant@group1代表的是主题tyrant的消费者组group1的消费进度
    //0:1代表的是queueid为0的消费到了在ConsumeQueue中下标为1的索引对应的消息
    {
    	"offsetTable":{
    		"tyrant@group1":{0:1,1:1,2:1,3:1,4:1,5:2,6:1,7:1
    		},
    		"%RETRY%group1@group1":{0:1
    		}
    	}
    }
每隔1分钟调整一次线程池
    private void startScheduledTask() {
        	//这里的namesrvAddr不是null
        	//原因是我们设置了
        	//consumer.setNamesrvAddr("localhost:9876");
        	//这个定时任务会跳过
        	//如果没有指定namesrvAddr才会拉取
            if (null == this.clientConfig.getNamesrvAddr()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                        } catch (Exception e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            
                
    		//更新路由元信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                    } catch (Exception e) {
                        //删除日志
                    }
                }
            }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    		//1.清理下线的broker 2.发送消费者和生产者的心跳信息到broker
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.cleanOfflineBroker();
                        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                    } catch (Exception e) {
                       log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                    }
                }
            }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        MQClientInstance.this.persistAllConsumerOffset();
                    } catch (Exception e) {
                       log.error("ScheduledTask persistAllConsumerOffset exception", e);
                    }
                }
            }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);


            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        MQClientInstance.this.adjustThreadPool();
                    } catch (Exception e) {
                        log.error("ScheduledTask adjustThreadPool exception", e);
                    }
                }
            }, 1, 1, TimeUnit.MINUTES);
        }

this.pullMessageService.start();

启动拉取消息服务

因为PullMessageService是一个线程所以需要看run方法

    @Override

        public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    PullRequest pullRequest = this.pullRequestQueue.take();
                    this.pullMessage(pullRequest);
                } catch (InterruptedException ignored) {
                } catch (Exception e) {
                    log.error("Pull Message Service Run Method exception", e);
                }

            }


            log.info(this.getServiceName() + " service end");
        }

默认使用的是DefaultMQPushConsumerImpl,也就是推模式。

    private void pullMessage(final PullRequest pullRequest) {
            final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
            if (consumer != null) {
                DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
                impl.pullMessage(pullRequest);
            } else {
                log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
            }
        }

this.rebalanceService.start();

启动负载均衡消费服务

消费端消息队列负载的核心功能方法是org.apache.rocketmq.client.impl.consumer.RebalanceImpl.updateProcessQueueTableInRebalance(String, Set, boolean),只解释该方法,其余方法看流程图看代码就很容易明白。

传入参数Set是经过负载后分配给当前消费端的mq集合,boolean表示是顺序消费true,并发消费false。

参考链接:blog.csdn.net/yulewo123/a…

服务负载均衡下篇文章专门讲。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY01flDM' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片