一、前言
RocketMQ是一款最初由阿里巴巴开源、后面贡献给Apache并成为顶级项目。
这次文章主要将的是如何搭建RocketMQ的源码调试环境。有了源码环境后,可以通过官方的单元测试进行调试、有利于更进一步去理解设计和源码。
二、源码下载
- 给出官方的地址:
https://github.com/apache/rocketmq/releases
- 本次下载版本为:
rocketmq-all-4.9.6
,对应的链接是:https://github.com/apache/rocketmq/archive/refs/tags/rocketmq-all-4.9.6.zip
- 下载zip并进行解压,通过 Idea 导入即可,最后的效果图是。
稍微讲一下对应的目录:
- acl、access control list 访问控制列表
- broker、Broker角色的代码
- client、客户端(消费者、生产者)的代码,类sdk
- common、公共模块
- distribution、用于部署RocketMQ,如bin目录、conf目录
- example、使用RocketMQ的使用案例(比较全)
- filter,RocketMQ的相关过滤器如
- logging、日志输出
- namesrv、RocketMQ命名服务角色的代码
- openmessaging、开放消息标准
- remoting、RocketMQ用于远程访问的模块代码
- srvutil、用于管理和操作NameServer的命令行工具
- store、负责消息的存储和管理
- test、单元测试代码
- tools、包含多个实用的集合,用于辅助RocketMQ的部署、管理和调试
三、源码环境搭建
大体上,环境的准备以及角色的启动和实际场景中差不多。我列举一下:
- 启动命名服务
NameServer
:因为Broker启动过程中需要注册进去 - 启动Broker
- 通过RocketMQ提供的案例进行测试
3.1 NameServer启动
3.1.1 修改启动参数
先在Idea界面依次点击
然后会弹框斤进行环境变量配置、对应的变量名为:ROCKETMQ_HOME
接下来保存好后,进入下一步。
PS:ROCKETMQ_HOME
的路径是自定义的!自定义的!自定义的!我这里为了方便直接放D盘了。
3.1.2 拷贝配置文件
接下来,在刚配的ROCKETMQ_HOME文件夹下面,创建3个子文件夹。
conf:为了放配置文件;
logs:为了放日志文件;
store:为了放消息存储类的文件;
建完文件夹,我们需要去源码复制一些配置文件。
把这3货copy到conf文件夹下面,最终效果如下:
3.1.3 修改配置文件
首先来改broker.conf
# 这是nameserver的地址
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.3.6
# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
storePathRootDir=<你自己ROCKETMQ_HOME路径>/store
# 这是commitLog的存储路径
storePathCommitLog=<你自己ROCKETMQ_HOME路径>/store/commitlog
# consume queue文件的存储路径
storePathConsumeQueue=<你自己ROCKETMQ_HOME路径>E/store/consumequeue
# 消息索引文件的存储路径
storePathIndex=<你自己ROCKETMQ_HOME路径>/store/index
# checkpoint文件的存储路径
storeCheckpoint=<你自己ROCKETMQ_HOME路径>/store/checkpoint
# abort文件的存储路径
abortFile=<你自己ROCKETMQ_HOME路径>/store/abort
稍微讲下参数的意思:
namesrvAddr指的是命名服务NameServer的IP地址;
brokerIP1指的是broker对应的IP地址;
3.1.4 启动
启动这步非常简单,直接点击即可。
当输出以下内容则算是成功了。
3.2 Broker启动
Broker启动也是非常简单。我们依旧需要进行几步:
- 修改启动参数
- 启动
3.2.1 修改启动参数
先在Idea界面依次点击
在弹出窗口中,我们需要配置两个参数:
- Program arguments
- Environment variables
3.2.2 启动
启动也是按照图的步骤来即可。
最终启动的效果图如下:
3.3 案例测试
很漂亮,我们已经快到最后一步了。现在只需要做两件事:
- 发送者发送消息
- 消费者处理消息
“什么?要写简易demo吗?我不想写”。
不,rocketMQ非常贴心的帮你找好了demo,就在example模块里面。
3.3.1 Producer demo 生产者
按照上述进行点击。
图中producer是生产者、consumer是消费者;先打开Producer.java,然后进行IP配置。下面我就直接粘贴代码了。
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static final int MESSAGE_COUNT = 1000;
public static final String PRODUCER_GROUP = "groupName";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";
public static final String TAG = "TagA";
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setProducerGroup(PRODUCER_GROUP);
// 配置命名服务
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 启动
producer.start();
// 循环发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
// 关闭
producer.shutdown();
}
}
然后点击启动,若是下图效果则成功
3.3.2 Consumer demo 消费者
同理,消费者也需要对Consumer.java进行处理:
- 打开Consumer.java
- 修改注册中心地址
- 运行
下面是对应的代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
/**
* This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
*/
public class Consumer {
public static final String CONSUMER_GROUP = "groupName";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* 初始化
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
/*
设置注册中心
*/
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* 订阅 topic
*/
consumer.subscribe(TOPIC, "*");
/*
* 注册回调函数,便于进行监听
*/
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动并持续监听
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
对应的运行效果图
3.3 存储级别校验
这时候我们来看store文件夹,我们会发现多了很多文件夹以及文件,也就是属于RocketMQ的存储系统相关的文件。下节我会继续去分析对应的存储模型。
四、问题记录
4.1 closeChannel: close the connection to remote address[] result: true
这个报错一般是生产者/消费者已经连上了命名服务,但是没有对应的消费者/生产者地址导致的;
4.2 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
连接超时。这个报错一般是,你的broker.conf对应的brokerIP地址没写对,导致生产者连接broker的时候,超时导致;
4.3 java: 程序包xxx不存在
当你遇到类似的报错的时候
请记住这一般是jdk的问题。解决的方式:
- 确定你目前jdk的版本。目前我自己运行在jdk1.8和jdk11是没什么问题
- 修改模块的jdk版本为你目前运行的版本(修改
Target bytecode version
)
- 修改对应项目的版本
再重新启动试试,祝好运 🙂
五、关注我
微信搜索公众号:野区杰西