RocketMQ:写一个消费者程序要注意些什么

我正在参加「掘金·启航计划」

1. 背景

书接上文:RocketMQ:写一个生产者程序要注意些什么

消息队列是我们日常工作中最常用到的中间件之一,那么写一个消费者程序要注意什么,这里已 RocketMQ 为示例,简单总结下,发车!

e80d62ddd65b4688a5628f5da27bb86a.gif

关键字:RocketMQ,消息中间件,消费者

2. 消费逻辑一定要保证幂等性

消息是在计算机网络间传送的,但计算机网络并不是完全可靠的,很多消息中间件为了保证消息送达都有各自的重试机制,RocketMQ也是。

也就是说,一条消息可能会被消费多次。如果不保证消费逻辑的幂等性,就有可能会造成系统异常,所以消费逻辑一定要保证幂等性。至于具体怎么保证幂等性,那就是另一个话题了。

3. 正确使用和配合 RocketMQ 的重试机制

消费者消费失败,RocketMQ broker就会重发消息,一般会重试16次,每次重试的间隔也并不相同,重试了16次之后任然失败,就会把消息放入死信队列里。
image.png

这里我们要注意几点:

  1. 消费者要返回一个 ConsumeConcurrentlyStatus 字段,CONSUME_SUCCESS 表示成功消费,不会重试,RECONSUME_LATER 表示消费失败,会重试。
  2. 是否真要重试16次?是不是一般重试个两三次就可以了,到了第四次还失败就直接不处理,打个Error日志,让系统告警。

根据鄙人多年互联网高强度打工的经验,推荐这么写:

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {


        // 上来先打个日志,知道收到的消息具体是什么样的,好排查问题
        log.info("xxx Consumer,msgs=%s, context=%s", msgs, context);
        try {
            doProcess(msgs); //业务逻辑处理
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // 消费失败,如果重试超过三次,就打 error 告警,否则打 warn 级别的日志尝试重试
            if (msgs.get(0).getReconsumeTimes() >= 3) {
                log.error("xxx Consumer 消费失败", e);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 返回 CONSUME_SUCCESS,不再重试
            } else {
                log.warn("xxx Consumer 消费重试,开始重试", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 返回 RECONSUME_LATER,进行重试
            }
        }

    }
});

4. 考虑是否要分优先级处理

这个问题分为三个子场景。

4.1 优先级场景一

一个 Topic 下分为 A、B、C 三类消息。A、B 两类消息的消息量大,处理速度也慢,C类消息则比较少,处理很快。实际运行的时候,往往会出现A、B大量堆积,C则只能等着前面的 A、B 消费完才会消费到 C。

如果能短任务优先,优先处理 C 类消息,这样能够提高整个系统的吞吐量。我们想实现的话,可以拆分成两个Topic,A、B消息使用一个Topic,C消息则使用另一个Topic。

4.2 优先级场景二

假设一个订单处理系统,服务了 100 家门店,正常一天处理一万个订单。突然某一天,某个门店爆单出现,出现2万个订单,这样会挤压其他门店的订单,显然不公平。
image.png

这个时候可以设置 100 个 MessageQueue,生产者根据门店 id 把门店的消息hash到不同的 MessageQueue。DefaultMQPushConsumer 默认采用循环的方式读取一个Topic的每一个 MessageQueue,这样如果某家门店的订单量大涨,这家门店对应的 MessageQueue 消息量也会跟着增加,处理时间随之增长,但不会造成其他门店等待时间增长。

image.png

另外,DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是说,每次从 MessageQueue 最多会拉取32 条消息,再上面的场景,为了更加公平,可以把 pullBatchSize 设置成 1。

当然,现实业务场景上,门店数量是变化的,无法做 MessageQueue 的数量随着门店的数量变更,更实际的做法是设置一个特殊的 MessageQueue,某一段时间内的订单数超过某个阈值,就往特殊的MessageQueue里面扔,确保公平性。

4.3 优先级场景三

一个 Topic 下分为 A、B、C 三类消息,优先级依次递减,要求只要有 A 就优先处理,接着再处理 B,最后处理 C。

对于这种场景 RocketMQ 是不支持的,需要改写 PullConsumer 自主控制 MessageQueue 的遍历以及消息的处理。

或者正常消费,先落库,开发一个独立模块基于优先级算法处理,这样便于迁移和维护。

5. 考虑吞吐量/消费速度的问题

当 Consumer 的处理速度跟不上消息的生产速度,就会造着堆积的现象,为了避免堆积的现象,那就需要提高消费速度提高吞吐量,这往往有两种种方式。

5.1 增加 Consumer 的实例数

Consumer 实例一多,并行消费的能力自然就更强了,消费速度就会更快。

5.2 增强 Consumer 的消费能力

批量处理肯定比单个处理快,可以设置 consumerMessageBatchMaxSize 参数(默认是1),消息多的时候,可以批量处理。

5.3 减少消息量

使用消息 tag 机制,只关注关心的消息,这个上一篇文章分享了,可以看看:RocketMQ:写一个生产者程序要注意些什么

6. 考虑堆积的问题

如果 Consumer 的消费速度已经无法再优化了,还是出现堆积的情况怎么办?我们首先要考虑一个问题:如果堆积了对系统会有影响?

如果没有影响,那就不用管了,设置合理的堆积告警阈值就行。

如果有影响,就需要根据堆积情况做处理,比如堆积到一定程度可以选择抛弃消息,这样不影响后续消息的处理。

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {


                Long offset = msgs.get(0).getQueueOffset();
                String maxOffSet = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
                long diff = Long.parseLong(maxOffSet) - offset;
                if(diff > 10000) { // 堆积超过 10000 条消息,不处理了了。
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                }

                
            }
        });

7. 总结

一个消费者程序,往往不过一百行代码,但仍然要考虑很多东西:幂等、优先级、消费速度、堆积处理。各位 baby 不管是自己开发还是帮别人 review 代码都要多多注意。

都看到这了,还不点赞是不是说不过去:(╯-_-)╯╧╧

u=3612796932,1394053552&fm=253&fmt=auto&app=138&f=JPEG.webp

8. Ref

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYS9EBbS' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片