28.RocketMQ之消费者的负载均衡源码

消费者负载均衡流程

当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup也就是在同1个消费组中。此时一个消费组中多个Consumer消费一个Topic,而一个Topic对应多个MessageQueue。

比如TopicA有2个Consumer,6个MessageQueue,那么这6个MessageQueue怎么分配呢?

这就涉及到Consumer的负载均衡了。

首先 Consumer 在启动时,会把自己注册给所有 Broker并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。

然后Consumer在消费时,会随机连接一台Broker ,获取消费组中的所有Consumer。

然后根据要消费的Topic的messageQueue和消费者的数量做负载均衡。

因为是分布式环境下的负载均衡,所以如何让每个消费者都能保证看到的视图是一样呢?

答案是先对消费者和messageQueue排序,然后每个消费者使用相同的负载均衡策略做负载均衡。

这样每个消费者看到的负载均衡分配的messageQueue的视图就是一致的。

负载均衡主要流程如下:

假如topicA 有 6个队列。

消费者1启动订阅topicA。通过findConsumerIdList方法到broker获取到所有的消费者,发现是1个。

对消费者和队列进行排序,然后执行负载均衡策略,获取到当前消费者的分配到的mqSet是6个。

此时消费者2启动,获取所有的消费者,发现是2个。

对消费者和队列进行排序。执行负载均衡策略。获取到当前消费者分配到的mqSet是4 5 6。

此时消费者2就可以对4 5 6这3个队列做消息的拉取了。

过了20秒消费者1的负载均衡任务又触发了,流程同上,消费者1获取到当前消费者分配到的mqSet是1 2 3。

此时消费者1就可以对1 2 3 这3个队列做消息的拉取了。

说明:为了方便,有些地方的messageQueue使用mq代替。

/*
循环遍历所有的topic,获取对应的队列集合messageQueueSet。
判断是否是顺序消费,是否顺序消费判断的依据是当前消费者绑定的listener是并发还是顺序
 
 在广播模式下
 1.遍历的是所有的mqset
    
 在集群模式下
 1.遍历的是所有的allocateResult 即根据负载均衡策略分配的mqSet
 2.移除新增的mq
 3.移除长时间未拉取的mq
 不会移除顺序消费且是集群模式的mq,也就是说不会参与负载均衡??
 4.遍历移除了不重要的(新增||长时间未拉取)的mq的mqSet
 5.如果消费者是顺序消费,尝试加锁该MessageQueue(远程向服务器端请求加锁,并设置本地mq加锁状态),
   加锁失败就跳过该mq 不会构建该mq的pullRequest。否则也会构建该mq的pullRequest。
 6.清除本地该mq的消费offset,从服务器拉取更新本地mq的消费offset
 7.构建pullRequest准备拉取消息
 
 在 rebalance 时,需要对 队列,还有消费者客户端 ID 进行排序,以确保同一个消费组下的视图是一致的。
 */


ServiceThread#start

public void start() {
    log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
    if (!started.compareAndSet(false, true)) {
        return;
    }
    stopped = false;
    //this 指的是rebalanceService
    this.thread = new Thread(this, getServiceName());
    this.thread.setDaemon(isDaemon);
    this.thread.start();
} 

RebalanceService#run

@Override



public void run() {
    log.info(this.getServiceName() + " service started");
    //
    while (!this.isStopped()) {
        //waitInterval 20秒 也就是每20秒需要负载均衡一次
        this.waitForRunning(waitInterval);
        //做负载均衡操作
        this.mqClientFactory.doRebalance();
    }
​
    log.info(this.getServiceName() + " service end");
}

MQClientInstance#doRebalance

每个消费者都需要做负载均衡

public void doRebalance() {
     //MQClientInstance遍历已注册的消费者,对消费者执行doRebalance()方法
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }

}

MQConsumerInner有2个实现,DefaultMQPullConsumerImpl和DefaultMQPushConsumerImpl。

分别是对应拉和推模式。默认DefaultMQPushConsumerImpl即推。

//DefaultMQPullConsumerImpl
//pull this.rebalanceImpl.doRebalance(false);

//DefaultMQPushConsumerImpl  
//push this.rebalanceImpl.doRebalance(this.isConsumeOrderly());

重点看推模式下的负载均衡。

DefaultMQPushConsumerImpl#doRebalance

假如topicA 有 6个队列。

消费者1启动订阅topicA。通过findConsumerIdList方法到broker获取到所有的消费者,发现是1个。

对消费者和队列进行排序,然后执行负载均衡策略,获取到当前消费者的分配到的mqSet是6个。

此时消费者2启动,获取所有的消费者,发现是2个。

对消费者和队列进行排序。执行负载均衡策略。获取到当前消费者分配到的mqSet是4 5 6。

此时消费者2就可以对4 5 6这3个队列做消息的拉取了。

过了20秒消费者1的负载均衡任务又触发了,流程同上,消费者1获取到当前消费者分配到的mqSet是1 2 3。

此时消费者1就可以对1 2 3 这3个队列做消息的拉取了。


参考链接:blog.csdn.net/HoneyYHQ998…

参考链接:blog.csdn.net/HoneyYHQ998…

/****
consumeOrderly:是否是顺序消费,consumeOrderly由监听器的类型决定
如果监听器是MessageListenerOrderly consumeOrderly 是true
如果监听器是MessageListenerConcurrently consumeOrderly 是false
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService
                (this, (MessageListenerOrderly) 
                        this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService
                (this, (MessageListenerConcurrently) 
                        this.getMessageListenerInner());
}
****/
@Override
public void doRebalance() {
    if (!this.pause) {
        
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
}

注意doRebalance的参数consumeOrderly。

consumeOrderly:是否是顺序消费,consumeOrderly由监听器的类型决定。

如果监听器是MessageListenerOrderly consumeOrderly是true。

如果监听器是MessageListenerConcurrently consumeOrderly是false。

RebalanceImpl#doRebalance

//isOrder是否需要顺序消费消息
public void doRebalance(final boolean isOrder) {
    //遍历topic 对每个topic订阅的队列进行重新负载
    // ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner
    //subscriptionInner的结构是1个Map,key是Topic,value是订阅信息
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        //注意在这里是for循环每个topic
       for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //根据topic负载均衡
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }

            }
        }
    }


    this.truncateMessageQueueNotMyTopic();
}

这里有个点:是for循环遍历topic,难道每个消费者组中的消费者还能订阅多个topic?

答案是肯定的!

有兴趣的同学可参考:blog.csdn.net/u011385940/…

接着往下看

RebalanceImpl#rebalanceByTopic

//topic 消息主题
//isOrder 是否顺序消费 
private void rebalanceByTopic(final String topic, final boolean isOrder) {


        switch (messageModel) {
            //广播模式每个消费者都需要消费所有的消息
            //所以在updateProcessQueueTableInRebalance中传入的是该topic下的所有的mqSet
            case BROADCASTING: {
                //获取每个topic的所有队列集合
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = 
                        this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                    }
                } else {
                    //不存在MessageQueue
                }

                break;
            }
                
            //集群模式
            case CLUSTERING: {
                 //1.获取topic的所有队列集合
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //2.从broker端获取topic的消费者集合,消费者集合种存放的是所有消费者的clientId
                //先根据topic获取topic所在的所有的broker的地址
                //随机选择1个broker地址拉取消费者列表
                //cidAll中的clientId长这样:172.18.120.141@21908
                List<String> cidAll = 
                    this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
			//不存在MessageQueue
                    }
                }

                if (null == cidAll) {
                    
                }
		//给消费者分配队列
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
					
                    //将cid和mq做排序 保证每个消费者的视图一致
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    //默认是AllocateMessageQueueAveragely
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        //AllocateMessageQueueAveragely的allocate
                        //mqAll 队列集合
                        //cidAll 消费者集合
                        //根据队列集合和消费者集合进行重新负载均衡
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            //本机的clientId
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception}");
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    //集群模式  
                    //集群模式每个消费者组的一个实例对个一个
                    //所以在updateProcessQueueTableInRebalance中传入的是allocateResultSet
                    boolean changed = this.updateProcessQueueTableInRebalance
                                            (topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info("rebalanced result changed");
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

广播模式

广播模式首先获取mqset,mqSet是该topic下面的所有的messageQueue。

然后调用updateProcessQueueTableInRebalance方法,根据返回值判断是否调用messageQueueChanged方法。

集群模式

1.获取topic下的所有队列的集合记作mqSet。

2.根据topic获取topic所在的所有的broker的地址,然后随机选择1个broker地址拉取消费者列表,

3.消费者集合中存放的是所有消费者的clientId记作cidAll。cidAll中的clientId格式:172.18.120.141@21908

4.将cid和mq做排序 保证每个消费者的视图一致

5.获取负载均衡策略,默认是AllocateMessageQueueAveragely

6.调用负载均衡策略的allocate方法,需要传入mqSet、cidAll、消费者的clientId、消费者组,allocate方法返回结果记作allocateResult。

7.将分配给自己的mq集合allocateResult加入allocateResultSet。

然后调用updateProcessQueueTableInRebalance方法,根据返回值判断是否调用messageQueueChanged方法。

虽然广播和集群都调用了updateProcessQueueTableInRebalance方法但是传入的参数mqSet不同。

广播模式传入的是topic下面的所有的messageQueue。

集群模式传入的allocateResult是该消费者在该topic分配到的队列。

其实不管是集群还是广播,传入的参数mqSet都是分配给消费者的队列。这么理解也没毛病!

RebalanceImpl#updateProcessQueueTableInRebalance

//mqSet消费者分配到的消息队列的集合
//isOrder是否是顺序消息
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {
    boolean changed = false;
    
    //it中存放的是Entry<MessageQueue,ProcessQueue>
    Iterator<Entry<MessageQueue,ProcessQueue>> it 
                                 		= this.processQueueTable.entrySet().iterator();
   //遍历Entry<MessageQueue,ProcessQueue>
    while (it.hasNext()) {
        
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();

        if (mq.getTopic().equals(topic)) {
              //假如所有的是123456,当前消费者分配的是456
              //那么遍历123456 判断当前消费者分配的456是否包含123456
              //这样做的目的是防止原来分配的是123 现在分配的是456
              //如果不丢弃123 会造成123的重复消费
              //mqSet是分配给当前消费者的队列集合
              //如果这个队列没有分配给当前消费者 
              //那么需要丢弃该队列丢弃该ProcessQueue
              //那么在拉取消息的时候就不会对该ProcessQueue进行拉取
            if (!mqSet.contains(mq)) {
            	//设置丢弃标识
                pq.setDropped(true);
                //广播模式直接返回true
                //如果是集群模式且顺序消费 返回false
                //把该mq的客户端消费offset更新到broker保存,移除客户端该mq的消费offset记录。
                //移除已经下线的mq
                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                    it.remove();
                    changed = true;
                    log.info("doRebalance, remove unnecessary mq");
                }
                //PULL_MAX_IDLE_TIME 默认是2分钟
                //("rocketmq.client.pull.pullMaxIdleTime","120000")
                //lastPullTimestamp + 2分钟 < 当前时间  认为该mq失效 
            } else if (pq.isPullExpired()) { 
                switch (this.consumeType()) {
                     //PULL
                    case CONSUME_ACTIVELY:
                        break;
                    //PUSH
                    case CONSUME_PASSIVELY:
                        //push走这里
                        //把ProcessQueue置为失效
                        //这样在PullService线程拉取的时候该对象是失效状态,就不再拉取该对象
                        pq.setDropped(true);
                        
                        //广播模式直接返回true
                		//如果是集群模式且顺序消费 返回false
                		//把该mq的客户端消费offset更新到broker保存
                        //移除客户端该mq的消费offset记录。
                        //从集合移除长时间没有拉取消息的mq
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                        }
                        break;
                    default:
                        break;
                }
            }
        }
    }//end while
 
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    
    //mqSet是清理了不必要的mq的mqSet
    for (MessageQueue mq : mqSet) {
        	//说明该mq是新增的 旧的不需要分配 已经分配过了
        if (!this.processQueueTable.containsKey(mq)) {
            //如果消费者是顺序消费
            //尝试加锁该MessageQueue
            //加锁失败就跳过该mq 不会拉去该mq的消息
            
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, add a new mq failed, because lock failed");
                continue;
            }
            
            //消费客户端移除该mq的消费offset
            //通过LocalFileOffsetStore或RemoteBrokerOffsetStore 移除 该消息队列的偏移量
            this.removeDirtyOffset(mq);
            
            ProcessQueue pq = new ProcessQueue();
            //向broker发送命令QUERY_CONSUMER_OFFSET获取broker端记录的该mq的消费offset
            long nextOffset = this.computePullFromWhere(mq);
            
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists");
                } else {
                    //构建拉取request
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }
    //遍历pullRequestList集合
    //把pullRequest对象添加到PullMessageService服务线程的阻塞队列内供PullMessageService拉取执行
    this.dispatchPullRequest(pullRequestList);
    return changed;
}

updateProcessQueueTableInRebalance是消费端重新负载的核心方法。

顾名思义功能就是更新处理器队列集合RebalanceImpl.processQueueTable。

那么什么是processQueue,它的作用是什么呢?

它是一个队列消费快照,消息拉取的时候,会将实际消息体、拉取相关的操作存放在其中。比如消费进度,消费等等功能的底层核心数据保存都是有ProcessQueue提供。

另外在拉取消息的时候使用的是PullRequest去请求,PullRequest结构如下:

public class PullRequest {
    private String consumerGroup;
    private MessageQueue messageQueue;
    private ProcessQueue processQueue;
    private long nextOffset;
    private boolean previouslyLocked = false;
}

可以发现1个ProcessQueue和1个MessageQueue是一一对应的,MessageQueue用来表示拉取回来的消息元数据信息。

具体可参考:blog.csdn.net/Saintmm/art…

假设有6个队列,有2个消费者分别是消费者A和消费者B。

————————————————消费者B第一轮负载均衡——————————————————

消费者B第一次做负载均衡分到的队列是123

1.先从processQueueTable获取Entry<MessageQueue, ProcessQueue>集合it

2.由于是第一次做负载均衡,所以it为空,此时跳过while循环

3.遍历本次分到的mqSet,判断processQueueTable中是否存在对应的mq,如果不存在说明是新分配的mq。

4.如果是新分配给当前消费者的mq&&当前消费者是顺序消费&&加锁失败啥也不做。

为什么要加锁?

因为顺序消费需要加分布式锁+本地锁,此处是在broker加分布式锁。

为什么加锁失败啥也不做?

因为加锁失败,说明是其它的消费者在broker加的分布式锁还没释放,那么等下次Rebalance的时候再尝试加锁即可。

5如果是新分配给当前消费者的mq&&当前消费者不是顺序消费&&加锁成功。

5.1调用removeDirtyOffset清除本地的脏消费偏移量

5.2重新计算队列拉取消息的偏移量。

5.3构建拉取消息的PullRequest

6.分发PullRequest拉取消息

————————————————消费者B第二轮负载均衡——————————————————

消费者B第二次做负载均衡分到的队列mqSet是34

1.先从processQueueTable获取Entry<MessageQueue, ProcessQueue>集合it,集合it中存放的是123

2.由于是第二次做负载均衡,所以it不为空,此时进入while循环

3.遍历it集合,集合it中存放的是123

4.先判断分配的mqSet是否包含1、2、3,mqSet是3、4,所以队列1、2会被丢弃

5.队列3显然是包含在mqSet中的,对于队列3会判断拉取是否过期,如果拉取过期那么要丢弃队列3,在removeUnnecessaryMessageQueue方法中如果消费者B是顺序消费还需要释放broker端的分布式锁。

6.遍历本次分到的mqSet,判断processQueueTable中是否存在对应的mq,如果不存在说明是新分配的mq。

7.如果是新分配给当前消费者的mq&&当前消费者是顺序消费&&加锁失败啥也不做。

为什么要加锁?

因为顺序消费需要加分布式锁+本地锁,此处是在broker加分布式锁。

为什么加锁失败啥也不做?

因为加锁失败,说明是其它的消费者在broker加的分布式锁还没释放,那么等下次Rebalance的时候再尝试加锁即可。

8.如果是新分配给当前消费者的mq&&当前消费者不是顺序消费&&加锁成功。

8.1调用removeDirtyOffset清除本地的脏消费偏移量

8.2重新计算队列拉取消息的偏移量。

8.3构建拉取消息的PullRequest

9.分发PullRequest拉取消息

RebalancePushImpl#removeUnnecessaryMessageQueue

@Override



public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
    //持久化当前消费进度到broker
    this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
    //移除本地消费进度
    this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
    //集群模式且顺序消费
    if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
        && MessageModel.CLUSTERING.equals
                    (this.defaultMQPushConsumerImpl.messageModel())) {
        try {
            if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
                try {
                    return this.unlockDelay(mq, pq);
                } finally {
                    pq.getLockConsume().unlock();
                }

            } else {
                log.warn("mq is consuming, so can not unlock it, {}. maybe hanged for a while");
                //增加tryUnlockTimes
                pq.incTryUnlockTimes();
            }
        } catch (Exception e) {
            log.error("removeUnnecessaryMessageQueue Exception", e);
        }
        return false;
    }

    //否则返回true
    return true;
}

his.unlockDelay(mq, pq);中调用RebalancePushImpl.this.unlock(mq, true);解除分布式锁

RebalancePushImpl#computePullFromWhere

	@Override
    public long computePullFromWhere(MessageQueue mq) {
        long result = -1;
        //默认是从上一个OFFSET消费 默认策略,从该队列最尾开始消费,即跳过历史消息
    	//private ConsumeFromWhere consumeFromWhere = 
    	// 			ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
            case CONSUME_FROM_MIN_OFFSET:
            case CONSUME_FROM_MAX_OFFSET:
                //默认是从上次消费的地方拉取
            case CONSUME_FROM_LAST_OFFSET: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    //返回本地消费的偏移量
                    result = lastOffset;
                }

                // First start,no offset
                else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        result = 0L;
                    } else {
                        try {
                            //从broker服务器拉取
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    result = 0L;
                } else {
                    result = -1;
                }
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    } else {
                        try {
                            long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                UtilAll.YYYYMMDDHHMMSS).getTime();
                            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }

            default:
                break;
        }

        return result;
    }

默认是CONSUME_FROM_LAST_OFFSET,从上次消费的地方拉取。

先从offsetStore获取拉取消息的偏移量,如果从offsetStore获取不到。

那么从broker端获取队列拉取消息的偏移量。

DefaultMQPullConsumerImpl负载均衡

MQClientInstance#start

 public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // 启动负载均衡服务 这里是一个Thread 所以看run方法
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

ServiceThread#start

    public void start() {
        log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
        if (!started.compareAndSet(false, true)) {
            return;
        }
        stopped = false;
        //this 指的是rebalanceService
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    } 

RebalanceService#run

@Override



    public void run() {
        log.info(this.getServiceName() + " service started");


        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            //做负载均衡操作
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

MQClientInstance#doRebalance

每个消费者都需要做负载均衡

  public void doRebalance() {
         //MQClientInstance遍历已注册的消费者,对消费者执行doRebalance()方法
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    //MQConsumerInner impl有2个实现
                    
                    //DefaultMQPullConsumerImpl
                    //pull this.rebalanceImpl.doRebalance(false);
                    
                    //DefaultMQPushConsumerImpl  
                    //push this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }

DefaultMQPullConsumerImpl#doRebalance

@Override



    public void doRebalance() {
        if (this.rebalanceImpl != null) {
            //注意这里传的一直是false
            this.rebalanceImpl.doRebalance(false);
        }
    }

DefaultMQPushConsumerImpl做负载均衡是否是顺序消费是根据消费者关联的监听器确定。

DefaultMQPullConsumerImpl做负载均衡是否是顺序消费传的是false。

DefaultMQPushConsumerImpl剩下的流程和DefaultMQPushConsumerImpl一模一样

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYjL04hw' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片