我正在参加「掘金·启航计划」
1. 背景
消息队列是我们日常工作中最常用到的中间件之一,那么写一个消费者程序要注意什么,这里已 RocketMQ 为示例,简单总结下,发车!
关键字:RocketMQ,消息中间件,消费者
2. 消费逻辑一定要保证幂等性
消息是在计算机网络间传送的,但计算机网络并不是完全可靠的,很多消息中间件为了保证消息送达都有各自的重试机制,RocketMQ也是。
也就是说,一条消息可能会被消费多次。如果不保证消费逻辑的幂等性,就有可能会造成系统异常,所以消费逻辑一定要保证幂等性。至于具体怎么保证幂等性,那就是另一个话题了。
3. 正确使用和配合 RocketMQ 的重试机制
消费者消费失败,RocketMQ broker就会重发消息,一般会重试16次,每次重试的间隔也并不相同,重试了16次之后任然失败,就会把消息放入死信队列里。
这里我们要注意几点:
- 消费者要返回一个 ConsumeConcurrentlyStatus 字段,CONSUME_SUCCESS 表示成功消费,不会重试,RECONSUME_LATER 表示消费失败,会重试。
- 是否真要重试16次?是不是一般重试个两三次就可以了,到了第四次还失败就直接不处理,打个Error日志,让系统告警。
根据鄙人多年互联网高强度打工的经验,推荐这么写:
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 上来先打个日志,知道收到的消息具体是什么样的,好排查问题
log.info("xxx Consumer,msgs=%s, context=%s", msgs, context);
try {
doProcess(msgs); //业务逻辑处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 消费失败,如果重试超过三次,就打 error 告警,否则打 warn 级别的日志尝试重试
if (msgs.get(0).getReconsumeTimes() >= 3) {
log.error("xxx Consumer 消费失败", e);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 返回 CONSUME_SUCCESS,不再重试
} else {
log.warn("xxx Consumer 消费重试,开始重试", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 返回 RECONSUME_LATER,进行重试
}
}
}
});
4. 考虑是否要分优先级处理
这个问题分为三个子场景。
4.1 优先级场景一
一个 Topic 下分为 A、B、C 三类消息。A、B 两类消息的消息量大,处理速度也慢,C类消息则比较少,处理很快。实际运行的时候,往往会出现A、B大量堆积,C则只能等着前面的 A、B 消费完才会消费到 C。
如果能短任务优先,优先处理 C 类消息,这样能够提高整个系统的吞吐量。我们想实现的话,可以拆分成两个Topic,A、B消息使用一个Topic,C消息则使用另一个Topic。
4.2 优先级场景二
假设一个订单处理系统,服务了 100 家门店,正常一天处理一万个订单。突然某一天,某个门店爆单出现,出现2万个订单,这样会挤压其他门店的订单,显然不公平。
这个时候可以设置 100 个 MessageQueue,生产者根据门店 id 把门店的消息hash到不同的 MessageQueue。DefaultMQPushConsumer 默认采用循环的方式读取一个Topic的每一个 MessageQueue,这样如果某家门店的订单量大涨,这家门店对应的 MessageQueue 消息量也会跟着增加,处理时间随之增长,但不会造成其他门店等待时间增长。
另外,DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是说,每次从 MessageQueue 最多会拉取32 条消息,再上面的场景,为了更加公平,可以把 pullBatchSize 设置成 1。
当然,现实业务场景上,门店数量是变化的,无法做 MessageQueue 的数量随着门店的数量变更,更实际的做法是设置一个特殊的 MessageQueue,某一段时间内的订单数超过某个阈值,就往特殊的MessageQueue里面扔,确保公平性。
4.3 优先级场景三
一个 Topic 下分为 A、B、C 三类消息,优先级依次递减,要求只要有 A 就优先处理,接着再处理 B,最后处理 C。
对于这种场景 RocketMQ 是不支持的,需要改写 PullConsumer 自主控制 MessageQueue 的遍历以及消息的处理。
或者正常消费,先落库,开发一个独立模块基于优先级算法处理,这样便于迁移和维护。
5. 考虑吞吐量/消费速度的问题
当 Consumer 的处理速度跟不上消息的生产速度,就会造着堆积的现象,为了避免堆积的现象,那就需要提高消费速度提高吞吐量,这往往有两种种方式。
5.1 增加 Consumer 的实例数
Consumer 实例一多,并行消费的能力自然就更强了,消费速度就会更快。
5.2 增强 Consumer 的消费能力
批量处理肯定比单个处理快,可以设置 consumerMessageBatchMaxSize 参数(默认是1),消息多的时候,可以批量处理。
5.3 减少消息量
使用消息 tag 机制,只关注关心的消息,这个上一篇文章分享了,可以看看:RocketMQ:写一个生产者程序要注意些什么
6. 考虑堆积的问题
如果 Consumer 的消费速度已经无法再优化了,还是出现堆积的情况怎么办?我们首先要考虑一个问题:如果堆积了对系统会有影响?
如果没有影响,那就不用管了,设置合理的堆积告警阈值就行。
如果有影响,就需要根据堆积情况做处理,比如堆积到一定程度可以选择抛弃消息,这样不影响后续消息的处理。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
Long offset = msgs.get(0).getQueueOffset();
String maxOffSet = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffSet) - offset;
if(diff > 10000) { // 堆积超过 10000 条消息,不处理了了。
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
}
}
});
7. 总结
一个消费者程序,往往不过一百行代码,但仍然要考虑很多东西:幂等、优先级、消费速度、堆积处理。各位 baby 不管是自己开发还是帮别人 review 代码都要多多注意。
都看到这了,还不点赞是不是说不过去:(╯-_-)╯╧╧
8. Ref
- 《RocketMQ 实战与原理解析》杨开元
- RocketMQ:写一个生产者程序要注意些什么