29.RocketMQ之消费者负载均衡策略

负载均衡策略概述

消费者在消费消息的时候,需要知道从Broker的哪一个消息队列中去获取消息。所以,在消费者端必须要做负载均衡,即Broker端中多个消费队列分配给同一个消费者组中的哪些消费者消费。

上篇文章讲解了负载均衡过程,这篇文章讲解消费者负载均衡的策略。

AllocateMessageQueueStrategy是负载均衡策略接口,有两个方法,分别是allocate和getName,allocate方法是为消费者分配消息队列,getName是获取负载均衡策略的名字。AllocateMessageQueueStrategy接口有六个子类实现。

负载均衡策略说明

它们具体说明如下:

  • AllocateMessageQueueAveragely:平均负载策略,将消息队列平均分配给每一个消息者。假设一个topic有8个消息队列,有3个消息者A、B、C,那么采用该负载策略,那么A首先分配3个消息队列,然后B也分配3个消息队列,最后C分配2个消息队列。

  • AllocateMessageQueueAveragelyByCircle:循环平均负载策略,跟平均负载策略不同的是。是将消息队列一个一个的分配给消息者。假设一个topic有8个消息队列,有3个消息者A、B、C。采用循环平均负载策略分配的方法是首先给A、B、C分别分配一个,然后再进行第二轮分配,也是给A、B、C分别再分配一个,这样子还剩下两个就分别分配给A、B。

  • AllocateMessageQueueByConfig:配置负载策略,用户直接给消费者分配配置消息队列。

  • AllocateMessageQueueConsistentHash:一致性哈希负载策略,为每一个消费者创建多个虚拟的节点,将虚拟节点连成一个环,这个环就是一致性哈希环,然后将消息队列进行哈希计算得到哈希值,通过哈希值找到距离一致性哈希环顺时针方向最近的那个虚拟节点,此时就可以通过虚拟节点获取到真实的消费者了,就将这个消息队列分配给这个消息者。

  • AllocateMessageQueueByMachineRoom:同机房分配策略,将Broker的消息队列分配给同机房的消费者。

  • AllocateMachineRoomNearby:AllocateMessageQueueByMachineRoom策略的升级版本,不仅将Broker的消息队列分配给同机房的消费者,还会将剩下的消息队列根据给定的分配策略进行分配给消费者。

接下来,将从源码的层面分析上述的负载均衡策略。

AllocateMessageQueueAveragely

//代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely#allocate
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,




        List<String> cidAll) {






        //省略校验合法性的代码





        //获取当前客户端id在所有消费者里面的位置
        int index = cidAll.indexOf(currentCID);

        //看是否能够均分
        int mod = mqAll.size() % cidAll.size();
        //如果消息队列的数量小于等于消费者的数量。一个消费者最多只能分到一个消息消息队列
        //否则,如果不能均分且index小于mode,均分以后再加上一个
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        //开始的位置
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        //被分配的消息队列的范围上限
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
}

AllocateMessageQueueAveragely的allocate方法的逻辑:

  • 获取当前消费者在所有消费者里面的位置(索引)
  • 计算当前消费者能分配的消息队列的数量,以及分配的消息队列的范围,即从哪里开始分配到哪里结束分配。
  • 根据分配消息队列的范围给消费者进行分配。

AllocateMessageQueueAveragelyByCircle

//代码位置: org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle#allocate
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,




        List<String> cidAll) {






        //省略校验合法性的代码





        //当前consumer排序后的索引
        int index = cidAll.indexOf(currentCID);

        for (int i = index; i < mqAll.size(); i++) {
            //取模
            if (i % cidAll.size() == index) {
                result.add(mqAll.get(i));
            }
        }
        return result;
}

AllocateMessageQueueAveragelyByCircle的allocate方法的逻辑:

  • 获取当前消费者在所有消费者里面的位置index(索引)
  • 遍历所有的消息队列,当前消息队列的索引对消费者数量进行取模的值等于index,将此此时的消息队列分配给消费者。

AllocateMessageQueueByConfig

/代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueByConfig#allocate
private List<MessageQueue> messageQueueList;

public void setMessageQueueList(List<MessageQueue> messageQueueList) {
        this.messageQueueList = messageQueueList;
}

    @Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        return this.messageQueueList;
}

AllocateMessageQueueByConfig类有一个messageQueueList属性,这个就是用户配置的消息队列。AllocateMessageQueueByConfig的allocate方法就是将用户配置的这个消息队列返回给消费者。

☆AllocateMessageQueueConsistentHash

//代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueConsistentHash#allocate 
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,




        List<String> cidAll) {






        //省略校验合法性的代码





        //构建消费者节点
        Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
        for (String cid : cidAll) {
            cidNodes.add(new ClientNode(cid));
        }

        //用消费者节点生产多个虚拟的节点,构建哈希环
        final ConsistentHashRouter<ClientNode> router; //for building hash ring
        if (customHashFunction != null) {
            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
        } else {
            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
        }



        List<MessageQueue> results = new ArrayList<MessageQueue>();
        //遍历消息队列
        for (MessageQueue mq : mqAll) {
            //对messageQueue进行hash计算,找到顺时针最近的consumer节点
            ClientNode clientNode = router.routeNode(mq.toString());
            //判断是否是当前consumer
            if (clientNode != null && currentCID.equals(clientNode.getKey())) {
                results.add(mq);
            }
        }

        return results;

}

AllocateMessageQueueConsistentHash的allocate方法逻辑:

  • 构建消费者节点
  • 用消费者节点生成多个虚拟的节点,构建哈希环
  • 遍历消息队列,对消息队列进行hash计算,找到距离最近的消费者节点,将此时的消息队列分配给消费者。

AllocateMessageQueueConsistentHash的allocate方法实现了一致性哈希算法,如果后续有使用一致性哈希算法的场景可拿来参考。

AllocateMessageQueueConsistentHash的allocate方法实现了一致性哈希算法,如果后续有使用一致性哈希算法的场景可拿来参考。

AllocateMessageQueueConsistentHash的allocate方法实现了一致性哈希算法,如果后续有使用一致性哈希算法的场景可拿来参考。

AllocateMessageQueueByMachineRoom

//代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueByMachineRoom#allocate 
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,




       //省略校验合法性的代码

        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {
            //brokerName命名规则   machine_room1@broker-a
            String[] temp = mq.getBrokerName().split("@");
            //判断是否符合指定的机房条件
            if (temp.length == 2 && consumeridcs.contains(temp[0])) {
                premqAll.add(mq);
            }
        }

        //平均分配到的队列数
        int mod = premqAll.size() / cidAll.size();
        //取模
        int rem = premqAll.size() % cidAll.size();
        //当前分配到的第一个队列索引
        int startIndex = mod * currentIndex;
        //当前分配到的最后一个队列索引
        int endIndex = startIndex + mod;
        //取startIndex到endIndex的队列
        for (int i = startIndex; i < endIndex; i++) {
            result.add(mqAll.get(i));
        }
        //如果不能平均分配,并且模大于当前的索引,再分配一个
        if (rem > currentIndex) {
            result.add(premqAll.get(currentIndex + mod * cidAll.size()));
        }
        return result;
}

AllocateMessageQueueByMachineRoom的allocate方法逻辑:

  • 遍历所有的消息队列,将所有的指定机房的消息队列收集起来。
  • 将上述获取得到的消息队列平均分配给消费者,如果不能分配,再次分配一个消息队列。

AllocateMachineRoomNearby

//代码位置:org.apache.rocketmq.client.consumer.rebalance.AllocateMachineRoomNearby#allocate 
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,




       //省略校验合法性的代码





        //group mq by machine room
        //根据机房对队列分组
        Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
        for (MessageQueue mq : mqAll) {
            //判断当前broker处于哪个机房
            String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
            //机房不为空,将broker放到分组中
            if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
                if (mr2Mq.get(brokerMachineRoom) == null) {
                    mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
                }
                mr2Mq.get(brokerMachineRoom).add(mq);
            } else {
                throw new IllegalArgumentException("Machine room is null for mq " + mq);
            }
        }


        //group consumer by machine room
        //consumer按机房分组
        Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
        for (String cid : cidAll) {
            //判断consumer处于哪个机房
            String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
            if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
                if (mr2c.get(consumerMachineRoom) == null) {
                    mr2c.put(consumerMachineRoom, new ArrayList<String>());
                }
                mr2c.get(consumerMachineRoom).add(cid);
            } else {
                throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
            }
        }

        List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

        //1.allocate the mq that deploy in the same machine room with the current consumer
        //给当前consumer分当前机房的那些MessageQeueue
        String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
        //得到当前机房的MessageQueue
        List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
        //得到当前机房的Consumer
        List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
        if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
            //得到当前机房所有MessageQueue和Consumers后根据指定的策略再负载
            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
        }

        //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
        //如果该MessageQueue的机房 没有同机房的consumer,将这些MessageQueue按配置好的备用策略分配给所有的consumer
        for (String machineRoom : mr2Mq.keySet()) {
            if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
                //添加分配到的游离态MessageQueue
                allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
            }
        }

        return allocateResults;
    }

AllocateMachineRoomNearby的allocate方法逻辑:

  • 将消息队列根据机房进行分组
  • 将消费者根据机房进行分组
  • 获取当前机房的所有消息队列,将当前机房的所有消息队列再根据给定的分配策略进行分配
  • 剩余不是该机房的消息队列根据给定的分配策略进行分配给其他的消费者。

以上内容来自:zhuanlan.zhihu.com/p/443869119

AllocateMessageQueueAveragely:平均分配

AllocateMessageQueueAveragely:平均分配
举例:8个队列q1,q2,q3,q4,q5,a6,q7,q8,消费者3个:c1,c2,c3

分配如下:

c1:q1,q2,q3
c2:q4,q5,a6
c3:q7,q8

AllocateMessageQueueAveragely:平均轮询分配

AllocateMessageQueueAveragelyByCircle:平均轮询分配
举例:8个队列q1,q2,q3,q4,q5,a6,q7,q8,消费者3个:c1,c2,c3

分配如下:

c1:q1,q4,q7
c2:q2,q5,a8
c3:q3,q6

注意:消息队列的分配遵循一个消费者可以分配到多个队列,但同一个消息队列只会分配给一个消费者,故如果出现消费者个数大于消息队列数量,则有些消费者无法消费消息,且这些消费者会被浪费!

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYMxxug0' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片