3.RocketMQ-消息生产

一、消息发送的几种方式

1. 同步

消息生产者向MQ发送消息时候,同步等待,直到服务器返回发送结果,即可靠的同步传输。

2. 异步

消息生产者向MQ发送消息时候,指定消息发送成功后的回调函数,调用消息API发送成功后,立刻返回,不阻塞消息发送流程,消息发送成功后或者失败后回调任务在一个新的线程中执行,即可靠的异步传输。

3. 单向

消息生产者向MQ发送消息时候,调用完消息发送API后,直接返回,不等待消息的发送结果,即单向

二、消息的高可用设计

2.1 消息发送重试机制

RocketMQ在消息发送失败的时候,默认会重试2次

2.2 故障规避机制

当消息第一次发送失败的时候,如果下一次消息发送还是路由到刚才的Broker上,其消息发送大概率是失败的,因此为了保证重试的可靠性,在重试的时候会尽量避开刚刚接收失败的Broker,而是选择其他Broker上的队列进行反送,从而提高消息的发送成功率
image.png

三、消息数据结构

RocketMQ消息封装类是org.apache.rocketmq.common.message.Message,相关属性如下:

    // 消息所属topiic
    private String topic;
    // 消息标记
    private int flag;
    // 扩展属性
    private Map<String, String> properties;
    // 消息体
    private byte[] body;
    // 事物d
    private String transactionId;

四、生产者启动流程分析

生产者默认实现org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl,查看其start方法实现如下:

image.png
这里注意创建的mqClientInstance实例,最终被MQClientManager管理,并且是单例的,所以同一个clientId只会创建一个MQClientInstance实例,clientId生成规则为:IP+instance+unitname,设想一下,如果同一台机器部署了两个应用程序,那么clientId岂不是相同?

为了避免这个问题,rocketMQ会自动的将instanceName设置为进程id避免互相影响。

五、消息发送流程

消息发送入口为org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernel
Impl,接口定义如下:


        private SendResult sendKernelImpl(final Message msg,
            final MessageQueue mq,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final TopicPublishInfo topicPublishInfo,
            final long timeout)

  • Message msg:待发送的消息
  • MessageQueue mq:消息要发送到的队列
  • CommunicationMode communicationMode:消息发送模式,SYNC、ASYNC、ONEWAY
  • SendCallback sendCallback:异步消息回调函数
  • TopicPublishInfo topicPublishInfo:主题路由信息
  • final long timeout:超时时间

消息发送整体流程如下:
image.png

5.1 获取Broker网格地址

image.png

根据MessageMQ信息获取Broker地址,如果未获取到,则从NameServer主动更新topic路由信息。

5.2 生成消息全局唯一id

image.png
为消息生成全局唯一id(雪花算法,ip+时间戳+递增序号),如果消息默认超过4KB,则对消息采用zip压缩,如果是事物消息,则设置消息类型为MessageSysFlag.TRANSACTION_PREPARED_TYPE;

5.3 消息增强

image.png
如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。

5.4 构建消息发送请求

image.png
消息体主要包含:生产者组、主题名称、默认创建主题key、队列数、队列id
消息标记、发送时间等等。

5.5 执行发送

RocketMQ支持三种消息发送,因此,这里也对应三种不同的消息发送方式。同步发送实现为MQClientAPIImpl#sendMessageAsync方法;异步发送为MQClientAPIImpl#sendMessageSync;单向比较简单,直接调用nettyClient发送请求即可;
以上实现其实都是调用的RemotingClient类,大家有对netty感兴趣的可以自行研究,这里就先不展开讲了。

六、小结

本文主要简单为大家介绍了下RocketMQ的消息相关设计与生产者一些概念和流程,下篇文章将会带着大家一起看看RocketMQ消费者相关流程实现

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYChGG1O' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片