RocketMQ4源码(四)生产者特性

前言

本章基于rocketmq4.6.0,分析两个生产者特性:延迟消息和事务消息。

一、延迟消息

案例

延迟消息的生产和消费,对于Producer和Consumer实例并没有特殊的操作。

仅仅通过设置Message延迟级别,即Message#setDelayTimeLevel,发送延迟消息。

DefaultMQProducer producer = new DefaultMQProducer("producer-group-1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message(
    "MyTestTopicA", // topic
    "TagA", // tag
    ("Hello Delay " + i).getBytes(StandardCharsets.UTF_8) // body
);
msg.setDelayTimeLevel(3); // 延迟级别=3 10s
SendResult sendResult = producer.send(msg); // 发送消息

Message用properties.DELAY存储延迟级别。

默认延迟级别有18个,在broker侧可配置,即MessageStoreConfig#messageDelayLevel

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

延迟消息主要逻辑都在broker侧,直接分析broker即可。

写CommitLog

CommitLog#putMessage:写commitlog前替换原始topic和queue

  1. 替换message.topic=SCHEDULE_TOPIC_XXXX,message.queueId=delayLevel-1;
  2. 设置properties.REAL_TOPIC=原topic,properties.REAL_QID=原queueId;

写ConsumeQueue

CommitLog#checkMessageAndReturnSize:

构建DispatchRequest时,原消息的tag.hashCode会被修改为投递时间,即存储时间+延迟时间。

进而影响consumequeue的20字节结构中的最后8字节tagHash变为投递时间

消费延迟Topic

ScheduleMessageService负责消费当前broker管理的topic=SCHEDULE_TOPIC_XXXX的所有queue。

初始化

ScheduleMessageService#parseDelayLevel加载延迟级别和延迟时长的映射关系到delayLevelTable

延迟topic下所有queue的消费进度都存储在delayOffset.json,加载到内存offsetTable

启动

ScheduleMessageService#start:

开启一个线程(jdk的timer)做两类事情。

一个是消费延迟topic,一个是每隔10s持久化消费进度到delayOffset.json。

由于延迟级别-1=queueId,所以为每个延迟级别开启一个DeliverDelayedMessageTimerTask

DeliverDelayedMessageTimerTask是个不可变对象。

delayLevel对应延迟级别,也对应消费queue;

offset对应从哪个逻辑offset开始消费。

消费

DeliverDelayedMessageTimerTask#executeOnTimeup:

消费延迟topic=SCHEDULE_TOPIC_XXXX,queueId=delayLevel-1。

消费逻辑和broker处理PullMessage逻辑一致,根据逻辑offset可以定位到一段consumequeue的buffer,然后循环每个consumequeue记录处理。

注意这里tagsCode不是原始消息的tag的哈希,在写ConsumeQueue阶段已经修改为目标投递时间戳

对于每条consumequeue记录,计算目标投递时间和当前时间的差countdown毫秒。

如果countdown大于0,代表还未到目标时间,即延迟topic的当前queue中,后续消息都还未到时间,因为延迟级别都相同

重新提交一个DeliverDelayedMessageTimerTask,延迟时间为countdown,下次执行即可投递真实消息,更新内存中的消费进度,当前任务直接结束。

如果countdown小于等于0,代表到了目标时间,当前消息可以给消费者消费。

根据consumequeue的commitlog物理offset+消息长度,从commitlog中读到topic=SCHEDULE_TOPIC_XXXX延迟消息。

延迟消息转换为真实消息,写入commitlog(写内存+异步刷盘),最终写入真实的consumequeue,对消费者可见。

DeliverDelayedMessageTimerTask#messageTimeup:将延迟消息转换为真实消息

使用REAL_TOPIC和REAL_QID替换topic和queueId;

移除properties.DELAY;

设置真实tag的hashCode;

设置为异步刷盘;

CommitLog和ConsumeQueue的变化关系大致如下:

其他情况:

如果写CommitLog失败,延迟10s再次执行Task。

如果当前delayLevel下没有新消息到达,延迟100ms再次执行Task。

二、事务消息

案例

案例来源于rocketmq-example,对于事务消息,逻辑都在producer和broker中。

首先定义一个TransactionListener,有两个方法:

  1. executeLocalTransaction:执行本地事务;
  2. checkLocalTransaction:响应broker回查本地事务执行状态;
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

生产者:

使用TransactionMQProducer,注入TransactionListener实现。

调用TransactionMQProducer#sendMessageInTransaction(String,Object)发送事务消息,这里第二个入参,对应TransactionListener#executeLocalTransaction执行本地事务的第二个入参,一般是业务模型。

public static void main(String[] args) {
    TransactionListener transactionListener = new TransactionListenerImpl();
    TransactionMQProducer producer = new TransactionMQProducer("my_tx_group");
    producer.setNamesrvAddr("localhost:9876");
    producer.setTransactionListener(transactionListener);
    producer.start();
    Message msg = 
        new Message("TopicTest1234", "TagA", "hello".getBytes(StandardCharsets.UTF_8));
    producer.sendMessageInTransaction(msg, null);
}

TransactionMQProducer

TransactionMQProducer继承普通生产者DefaultMQProducer。

相较于普通生产者多了两个属性:

  1. transactionListener,用户代码逻辑,负责执行本地事务和本地事务状态回查;
  2. executorService,线程池,负责执行本地事务状态回查;

默认本地事务状态回查线程池采用1线程2000队列。

TransactionMQProducer#sendMessageInTransaction:

这是事务消息生产者相较于普通消息生产者唯一新增的api,发送事务消息。

其底层还是调用内部生产实现DefaultMQProducerImpl

producer发送半消息

TransactionMQProducer#sendMessageInTransaction:

producer侧,发送半消息主要是设置Message的属性。

  1. 不支持延迟消息;
  2. 设置properties.TRAN_MSG=true,代表这是一条事务半消息;
  3. 设置properties.PGROUP=生产者组,用于消息回查;

后面send方法基本同普通消息,除了请求头的sysFlag设置事务PREPARE标志位。

borker接收半消息

接着,半消息来到broker。

SendMessageProcessor#sendMessage:

将客户端请求转换为MessageExeBrokerInner后,

识别到properties.TRAN_MSG=true,进入TransactionalMessageService#prepareMessage。

TransactionalMessageServiceImpl#prepareMessage:

TransactionalMessageBridge#putHalfMessage:

事务消息和延迟消息类似,将topic替换为RMQ_SYS_TRANS_HALF_TOPIC,queueId替换为0。

真实topic存储到REAL_TOPIC和REAL_QID。

区别在于事务消息并没有对ConsumeQueue做特殊处理,即tagHash不会变。

producer本地事务

DefaultMQProducerImpl#sendMessageInTransaction:

如果发送broker异常,直接抛出MQClientException。

如果发送broker成功,且SEND_OK,执行本地事务TransactionListener#executeLocalTransaction,如果本地事务执行抛出异常,state=UNKNOWN,localException非null;

如果发送broker成功,且非SEND_OK,包括刷盘超时、同步SLAVE超时、SLAVE不可用,不执行本地事务,state=ROLLBACK;

Message的transactionId就是生产者生成的messageId。

如果用户代码返回null,state=UNKNOWN。

producer提交/回滚/未知

DefaultMQProducerImpl#sendMessageInTransaction:

这是生产者的最后一步,不会抛出异常到用户代码。

即如果本地事务执行成功,生产者不用关心后续流程,由broker回查来确认最终事务状态。

DefaultMQProducerImpl#endTransaction:

首先通过broker返回的SendResult#offsetMsgId解析得到MessageId。

在第二章说到,broker会生成一个messageId给producer,这个messageId包含两部分:

  1. broker的ip和port,即MessageId#address;
  2. 消息在这个broker的commitlog的物理offset;

EndTransactionRequestHeader包含多个重要属性:

  1. commitLogOffset:commitlog物理offset;
  2. tranStateTableOffset:逻辑offset;
  3. producerGroup:生产组;
  4. commitOrRollback:二阶段提交/回滚/未知标志;

MQClientAPIImpl#endTransactionOneway:

最终EndTransactionRequestHeader以oneway请求的方式发送给broker。

对生产者来说,sendMessageInTransaction api主要耗时在发送半消息和执行本地事务,二阶段结果发送broker只需要提交到io线程即可。

broker提交/回滚/未知

对于END_TRANSACTION请求,broker用单独的业务线程池处理。

线程数:8+2*核数,队列10w。

EndTransactionProcessor#processRequest对于本地事务UNKNWON状态,不做任何处理。

EndTransactionProcessor#processRequest提交分为四步:

  1. commitMessage,根据commitlog的物理offset,找到HALF消息;
  2. 组装真实消息MessageExtBrokerInner;
  3. 调用MessageStore将真实消息写入commitlog(最终写入consumequeue被消费者感知到);
  4. 删除HALF消息;

EndTransactionProcessor#processRequest回滚分为两步:

  1. rollbackMessage,虽然方法名不同,但是和提交一致,根据commitlog的物理offset,找到HALF消息;
  2. 删除HALF消息;

综上,提交HALF消息,涉及一个真实消息和删除HALF消息;回滚HALF消息,仅涉及删除HALF消息。

并且整个流程没有对异常做任何处理,无论哪一步失败,都将由消息回查流程处理

真实消息

EndTransactionProcessor#endMessageTransaction

提交真实消息,和延迟消息类似,用真实topic和queue替换HALF消息的topic和queue

删除HALF消息(Op消息)

为什么要删除HALF消息?

众所周知,broker会启动一个定时任务,判断HALF消息是否已经被二阶段commit或rollback。

如果HALF消息还未收到二阶段指令,那么需要回查。

如果不删除HALF消息,那么会一直回查。

TransactionalMessageServiceImpl#deletePrepareMessage:

但rocketmq不支持删除消息,因为所有消息相关文件都是顺序写的。

所以对于二阶段提交/回滚,实际底层是发送一个Op消息

如果HALF消息有匹配的OP消息,代表HALF消息已经成功处理(收到二阶段指令);

反之代表HALF消息还未处理完成,可能需要回查producer。

TransactionalMessageBridge#putOpMessage:

通过HALF消息,反向构建一个MessageQueue模型,topic=HALF消息topic,queueId=0。

TransactionalMessageBridge#addRemoveTagInTransactionOp:

和客户端一样,构建了一个Message。

topic=RMQ_SYS_TRANS_OP_HALF_TOPIC(Op消息topic);

tag=d

body=HALF消息的逻辑offset

TransactionalMessageBridge#writeOp:

针对每个HALF消息topic的MessageQueue,映射到一个OP消息topic的MessageQueue。

实际上,HALF消息topic的队列数量为1,所以这里opQueueMap只有一个kv对。

最终还是将消息转换为MessaageExtBrokerInner写入MessageStore。

broker回查

如果在一定时间内,未收到producer发来的二阶段明确指示,即commit/rollback,broker会回查producer本地事务执行情况。

TransactionalMessageCheckService线程每隔60秒进行一次回查逻辑。

TransactionalMessageServiceImpl#check:

首先找到HALF_TOPIC下所有MessageQueue,前面broker接收半消息,queueId写死0,所以只有一个MessageQueue

查询HALF&OP消费进度

TransactionalMessageServiceImpl#check:

HALF_TOPIC的queue可以映射到一个OP_TOPIC的queue。

查询系统消费组CID_RMQ_SYS_TRANS对于两个queue的消费进度。

拉取Op消息32条

TransactionalMessageServiceImpl#check:

接下来,根据OP_TOPIC的消费进度,拉取OP消息,组装两个集合:

  1. doneOpOffset:op消息的body(op对应half消息逻辑offset)小于half消费进度,即op消息已处理;
  2. removeMap:op消息的body(op对应half消息逻辑offset)大于等于half消费进度,代表这些half消息,已经收到二阶段明确指令(commit/rollback);

TransactionalMessageServiceImpl#fillOpRemoveMap:

拉取op消息,一次最多32条op,比较op消息的body和当前half消息消费进度的大小。

op消息的body就是commit/rollback指令对应的half消息的逻辑offset

如果op消息的body(queueOffset)小于half消费进度(miniOffset),加入doneOpOffset,即这些op消息已经没有用了(client重复或超时commit或rollback一个half消息,见后面HALF回查),对应half消息已经处理了。

如果op消息的body(queueOffset)大于等于half消费进度(miniOffset),加入removeMap,这些op消息mapping到了half消息,即这些half消息已经被commit/rollback。

通过fillOpRemoveMap,removeMap中是所有二阶段处理完成的half消息,反之不在removeMap里的就是还未收到二阶段指令的half消息

如下图,假设现在half和op消费进度都是0,half消息已经发送了5条,而op消息只收到4条,可以看到half逻辑offset=3的这条消息还未收到二阶段指令。

消费HALF消息

TransactionalMessageServiceImpl#check:

while-true循环处理half消息。

如果half消息进度在removeMap中,代表收到二阶段指令,直接将op逻辑offset加入doneOpOffset中;

如果half消息进度不在removeMap中,可能需要执行回查,为什么说是可能,因为op一次才拉32条,不一定足够匹配当前half消息。

TransactionalMessageServiceImpl#check:

Step1,根据half消费进度(逻辑offset)查询消息。

如果没有找到,代表还没有新的half消息到达,直接退出

Step2,校验回查次数。

如果half消息回查超过15次,投递到topic=TRANS_CHECK_MAXTIME_TOPIC

如果half消息回查未超过15次,needDiscard将Message的properties的回查次数+1

Step3,如果这条half消息存储时间大于本轮检查的开始时间,直接退出。

Step4,如果这条half消息,还未到超时时间6s,直接退出,后面的half消息也不可能到超时时间

Step5,

如果需要回查,将half消息重新写入HALF_TOPIC,重入次数+1,调用producer回查事务状态

如果不需要回查,fillOpRemoveMap再拉32条op消息,进行下一轮判断

HALF回查

回写half消息

在向producer发送回查请求前,先要将当前的half消息重新写一份到commitlog

TransactionalMessageServiceImpl#putBackHalfMsgQueue:

注意,新half消息的逻辑offset和commitlog物理offset会填充到老half消息中

这意味着客户端收到的回查请求,一定是针对新half消息的,老half消息将被消费,老op消息将被视为失效后续进入doneOpOffset。

TransactionalMessageBridge#renewHalfMessageInner:根据老half消息,生成新half消息。

重新写入HALF_TOPIC的消息的生成时间戳是不会变的。

(TransactionalMessageServiceImpl#needDiscard校验回查次数的时候,已经将回查次数+1了)

发送CHECK_TRANSACTION_STATE请求

AbstractTransactionalMessageCheckListener#resolveHalfMsg:

half消息重新写入commitlog之后,才会异步发送回查请求给producer。

线程池:2核心线程,5最大线程,2000队列,CallerRuns拒绝策略。

AbstractTransactionalMessageCheckListener#sendCheckMessage:

broker根据生产组,找连接broker的一个生产者实例,发送CHECK_TRANSACTION_STATE请求。

其中包含transactionId(生产者messageId),producer可以用transactionId确认本地事务情况。

更新HALF&OP消费进度

当没有更新的half消息或half头部消息未超时,结束本轮校验,更新两个topic的消费进度。

half和op提交的情况非常相似:

  1. half匹配到op,正常收到producer二阶段commit/rollback;
  2. half未匹配到op且half超时,创建新half,老half在本轮提交,而后续收到的op会被视为无效也会被提交;

TransactionalMessageServiceImpl#calculateOpOffset:

需要注意的是,op消息并非只要匹配到half消息/half超时,op消息offset就可以提交到该位置

op提交只能提交到第一个未被half匹配的op消息

假设目前一共收到4条half消息,收到3个commit/rollback,关系如下。

其中half2尚未收到对应op且还未超时,于是结束本轮循环(即使half3能匹配op1,half的消费进度同样也不能跳)。

doneOffset中会包含0和2,此时opOffset只能前进到1,而不是3

如果opOffset前进到3,下一次half3消息就匹配不到op消息了。

producer回查

ClientRemotingProcessor#checkTransactionState:

producer收到CHECK_TRANSACTION_STATE。

DefaultMQProducerImpl#checkTransactionState:

提交回查请求到checkExecutor。(默认1线程2000队列)

执行用户TransactionListener#checkLocalTransaction。

最终还是走END_TRANSACTION响应broker。

总结

延迟消息

producer通过设置Message.properties的DELAY为指定延迟级别,发送延迟消息。

默认broker支持18个延迟级别从1s到2h,可以通过设置messageDelayLevel修改。

broker收到消息后,判断DELAY非空,识别为延迟消息。

写入commitlog前,修改原始消息:

  1. 替换message.topic=SCHEDULE_TOPIC_XXXX,message.queueId=delayLevel-1
  2. 设置properties.REAL_TOPIC=原topic,properties.REAL_QID=原queueId;

commitlog写入后,DispatchRequest中的tag的hashCode会被修改为目标投递时间

最终ConsumeQueue中的最后8字节变为目标投递时间戳。

broker针对每个delayLevel开启一个DeliverDelayedMessageTimerTask消费queueId=delayLevel-1中的延迟消息

需要注意的是,在4.x版本,所有延迟级别的task都使用同一个线程,用jdk的Timer实现。

Task拉取consumequeue记录后,进行判断:

  1. 如果consumequeue为空,延迟100ms再执行task;
  2. 如果consumequeue非空,判断consumequeue记录中的目标投递时间戳;
  3. 如果目标投递时间戳小于等于当前时间,则写入真实消息(从REAL_TOPIC和REAL_QID提取真实topic和queue)到commitlog,继续处理下一个consumequeue记录;
  4. 如果目标投递时间戳大于当前时间,则延迟(目标时间-当前时间)再执行task;

SCHEDULE_TOPIC_XXXX的消费进度存储在delayOffset.json中,每隔10s对内存中的消费进度进行持久化(和消费使用同一个timer线程)。

事务消息

producer需要指定生产者组和TransactionListener实现。

TransactionListener负责本地事务执行和本地事务状态查询。

正常流程

producer端使用TransactionMQProducer#sendMessageInTransaction发送事务消息。

Step1:producer发送half消息给broker。

设置properties.TRAN_MSG=true,代表这是一条事务半消息;

设置properties.PGROUP=生产者组,用于消息回查;

producer等待broker响应。

Step2:broker收到half消息,判断TRAN_MSG=true,处理事务消息。

broker将topic替换为RMQ_SYS_TRANS_HALF_TOPICqueueId替换为0;真实topic存储到REAL_TOPIC和REAL_QID

最终存储到commitlog,构建consumequeue。

Step3:producer收到broker响应,执行本地事务。

如果发送broker异常,直接抛出MQClientException,sendMessageInTransaction结束。

如果发送broker成功,但返回刷盘超时、同步SLAVE超时、SLAVE不可用,不执行本地事务,state=ROLLBACK;

如果发送broker成功,且SEND_OK,执行本地事务TransactionListener#executeLocalTransaction

  1. 如果本地事务执行抛出异常,state=UNKNOWN
  2. 如果本地事务执行返回null,state=UNKNOWN;
  3. 其他,state=本地事务执行返回;

Step4:producer发送END_TRANSACTION,告知broker本地事务执行状态state。

END_TRANSACTION请求是个oneway请求,且try-catch所有异常,如果有问题由broker补偿回查。

END_TRANSACTION绝大多数参数来源于half消息响应,包含half消息的物理offset、逻辑offset等。

至此TransactionMQProducer#sendMessageInTransaction结束。

broker处理END_TRANSACTION

对于state=UNKNOWN不处理;

对于state=ROLLBACK,根据物理offset查询half消息,写op消息;

对于state=COMMIT,根据物理offset查询half消息,写真实消息(REAL_TOPIC和REAL_QID替换),写op消息;

op消息相关属性:

  1. topic=RMQ_SYS_TRANS_OP_HALF_TOPIC
  2. queueId=0;(一个half消息queue映射到一个op消息queue,而half消息只有一个queue)
  3. tag=d
  4. body=half消息的逻辑offset

op消息用于标记一个half消息是否收到二阶段commit/rollback,

如果half消息超时未收到对应op消息会由broker发起回查。

回查

TransactionalMessageCheckService线程每隔60秒进行一次回查逻辑。

broker以group=CID_RMQ_SYS_TRANS的名义,消费half消息(RMQ_SYS_TRANS_HALF_TOPIC)和op消息(RMQ_SYS_TRANS_OP_HALF_TOPIC)。

Step1查询消费进度

分别查询half消息和op消息的消费进度。

Step2首次拉取32条op消息

如果op消息对应half消息(body中的half消息逻辑offset)已经被处理(body中的half消息offset小于当前half消费进度),op消息被忽略,加入doneOpOffset集合;

如果op消息对应half消息还未处理,op消息加入removeMap,key是half消息逻辑offset,value是op消息逻辑offset;

Step3消费HALF消息

根据当前half消费进度,进行while-true循环。

如果half消息在removeMap中,代表half消息已经匹配到op,收到二阶段指令,不用回查,将half消息加入doneOpOffset集合;

如果half消息不在removeMap中,可能代表half消息还未收到二阶段指令:

  1. 根据half消息逻辑offset,查询half消息,如果为空,代表还没有新的half消息到达,直接Step5;
  2. 判断half消息是否超过回查次数15,如果超出,将half消息投递到TRANS_CHECK_MAXTIME_TOPIC,继续Step3;
  3. 判断half消息的存储时间,如果还未到6s超时,直接Step5,因为后续half消息都未超时;
  4. 如果需要回查Step4后继续Step3,如果不回查同拉取更多op消息Step3;

Step4消息回查

broker侧

  1. 重新写一份half消息到commitlog,老half消息将被消费,新half消息需要匹配新的op消息;
  2. 异步发送CHECK_TRANSACTION_STATE给producer组内的producer实例,请求中是新half消息

producer侧:回调TransactionListener查询本地事务状态,返回END_TRANSACTION请求给broker(同上)。

Step5,上述while-true执行half消息消费完毕,提交half和op消息的消费进度

half和op提交offset的情况非常相似:

  1. half匹配到op,正常收到producer二阶段commit/rollback;
  2. half未匹配到op且half超时,创建新half,老half在本轮提交,而后续收到的op会被视为无效也会被提交;

需要注意的是,op消息并非只要匹配到half消息/half超时,op消息offset就可以提交到该位置

op提交只能提交到第一个未被half匹配的op消息

欢迎大家评论或私信讨论问题。

本文原创,未经许可不得转载。

欢迎关注公众号【程序猿阿越】。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYKaFObH' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片