一、消息发送的几种方式
1. 同步
消息生产者向MQ发送消息时候,同步等待,直到服务器返回发送结果,即可靠的同步传输。
2. 异步
消息生产者向MQ发送消息时候,指定消息发送成功后的回调函数,调用消息API发送成功后,立刻返回,不阻塞消息发送流程,消息发送成功后或者失败后回调任务在一个新的线程中执行,即可靠的异步传输。
3. 单向
消息生产者向MQ发送消息时候,调用完消息发送API后,直接返回,不等待消息的发送结果,即单向
二、消息的高可用设计
2.1 消息发送重试机制
RocketMQ在消息发送失败的时候,默认会重试2次
2.2 故障规避机制
当消息第一次发送失败的时候,如果下一次消息发送还是路由到刚才的Broker上,其消息发送大概率是失败的,因此为了保证重试的可靠性,在重试的时候会尽量避开刚刚接收失败的Broker,而是选择其他Broker上的队列进行反送,从而提高消息的发送成功率
三、消息数据结构
RocketMQ消息封装类是org.apache.rocketmq.common.message.Message,相关属性如下:
// 消息所属topiic
private String topic;
// 消息标记
private int flag;
// 扩展属性
private Map<String, String> properties;
// 消息体
private byte[] body;
// 事物d
private String transactionId;
四、生产者启动流程分析
生产者默认实现org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl,查看其start方法实现如下:
这里注意创建的mqClientInstance实例,最终被MQClientManager管理,并且是单例的,所以同一个clientId只会创建一个MQClientInstance实例,clientId生成规则为:IP+instance+unitname,设想一下,如果同一台机器部署了两个应用程序,那么clientId岂不是相同?
为了避免这个问题,rocketMQ会自动的将instanceName设置为进程id避免互相影响。
五、消息发送流程
消息发送入口为org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernel
Impl,接口定义如下:
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)
- Message msg:待发送的消息
- MessageQueue mq:消息要发送到的队列
- CommunicationMode communicationMode:消息发送模式,SYNC、ASYNC、ONEWAY
- SendCallback sendCallback:异步消息回调函数
- TopicPublishInfo topicPublishInfo:主题路由信息
- final long timeout:超时时间
消息发送整体流程如下:
5.1 获取Broker网格地址
根据MessageMQ信息获取Broker地址,如果未获取到,则从NameServer主动更新topic路由信息。
5.2 生成消息全局唯一id
为消息生成全局唯一id(雪花算法,ip+时间戳+递增序号),如果消息默认超过4KB,则对消息采用zip压缩,如果是事物消息,则设置消息类型为MessageSysFlag.TRANSACTION_PREPARED_TYPE;
5.3 消息增强
如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。
5.4 构建消息发送请求
消息体主要包含:生产者组、主题名称、默认创建主题key、队列数、队列id
消息标记、发送时间等等。
5.5 执行发送
RocketMQ支持三种消息发送,因此,这里也对应三种不同的消息发送方式。同步发送实现为MQClientAPIImpl#sendMessageAsync方法;异步发送为MQClientAPIImpl#sendMessageSync;单向比较简单,直接调用nettyClient发送请求即可;
以上实现其实都是调用的RemotingClient类,大家有对netty感兴趣的可以自行研究,这里就先不展开讲了。
六、小结
本文主要简单为大家介绍了下RocketMQ的消息相关设计与生产者一些概念和流程,下篇文章将会带着大家一起看看RocketMQ消费者相关流程实现