这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
RocketMQ版本
- 5.1.0
入口
这里消息删除的源码入口其实不太好直接确定,我们可以通过启动broker
的代码开始即BrokerStartup
的main
方法
这里我们直接进入到controller.start();
方法看看
里面的代码很长,我们可以看到有一个方法startBasicService();
进入startBasicService();
方法后我们可以看到有一个this.messageStore.start();
这个messageStore
就是封在了我们的一些消息文件的操作 可以简单看看他的一些方法
由于方法众多,我们回到今天的主线,继续查看start()
方法的实现类即DefaultMessageStore
在start
方法里面我们可以看到一个比较核心的方法this.addScheduleTask();
看着似乎是启动一些定时任务,没错,我们的消息过期删除就是通过定时任务扫描然后删除的
进入定时任务我们就能看到我们的文件删除定时任务了
可以看到这个定时任务是broker启动后 60s后再启动,每次执行的时间间隔是可以配置的,默认10s
实际的过期文件删除逻辑还是被封在再DefaultMessageStore.this.cleanFilesPeriodically();
中
cleanFilesPeriodically
我们进入cleanFilesPeriodically
方法看看
private void cleanFilesPeriodically() {
this.cleanCommitLogService.run();
this.cleanConsumeQueueService.run();
this.correctLogicOffsetService.run();
}
可以看到有三个清理任务
- 清理
CommitLog
- 清理
ConsumeQueue
、index
- 更新逻辑偏移量即
ConsumeQueueStore
cleanCommitLogService
这里我们先看看CommitLog
是如何清理的
public void run() {
try {
this.deleteExpiredFiles();
this.reDeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
}
}
这里可以看到又分两步
- 删除已经失效的文件
- 再次删除第一次未删除掉的文件(比如文件被其他线程占用)
这里我们先看看deleteExpiredFiles
方法
deleteExpiredFiles
private void deleteExpiredFiles() {
int deleteCount = 0;
// 获取配置的过期时间 默认72小时 3天
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 获取删除文件的时间间隔 默认100毫秒
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 1000 * 120 120s 强制删除文件映射
int destroyMappedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 每次删除文件的最大数量 默认10
int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax();
//是否是删除时间 默认凌晨 4点(04) 04:00~04:59
boolean isTimeUp = this.isTimeToDelete();
// 磁盘是否已满
boolean isUsageExceedsThreshold = this.isSpaceToDelete();
boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0;
if (isTimeUp || isUsageExceedsThreshold || isManualDelete) {
if (isManualDelete) {
this.manualDeleteFileSeveralTimes--;
}
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}",
fileReservedTime,
isTimeUp,
isUsageExceedsThreshold,
manualDeleteFileSeveralTimes,
cleanAtOnce,
deleteFileBatchMax);
fileReservedTime *= 60 * 60 * 1000;
// 实际清理逻辑
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
if (deleteCount > 0) {
// 如果是 controller模式,还要清理 EpochFile
// If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFile
if (DefaultMessageStore.this.brokerConfig.isEnableControllerMode()) {
if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {
final long minPhyOffset = getMinPhyOffset();
((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
}
}
} else if (isUsageExceedsThreshold) {
LOGGER.warn("disk space will be full soon, but delete file failed.");
}
}
}
上面有一些代码都已经注释了,可以看到这里主要是做一些判断,是否徐要清理commitLog
有如下几个条件
- 是否在配置的时间内,默认4点-5点
- 是否磁盘已满
- 是否手动调度,默认执行20次
3这里的条件我们可以在这里确定
可以看到这个参数主要是给外部控制台去使用的
三个条件任意满足一个就去执行删除逻辑
实际的文件删除逻辑是在方法
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
commitLog.deleteExpiredFile
最终是调用的MappedFileQueue
的deleteExpiredFileByTime
方法
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately,
final int deleteFileBatchMax) {
//获取映射文件列表 commitlog文件可能随时有写入,copy一份不影响写入
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
// 删除的文件数量
int deleteCount = 0;
// 需要删除的 MappedFile
List<MappedFile> files = new ArrayList<>();
int skipFileNum = 0;
if (null != mfs) {
//do check before deleting
checkSelf();
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 当前时间大于等于 liveMaxTimestamp 或者 cleanImmediately 为 true(手动清理或磁盘空间达到85%),表示文件过期或需要立即清理
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
if (skipFileNum > 0) {
log.info("Delete CommitLog {} but skip {} files", mappedFile.getFileName(), skipFileNum);
}
// 真正的删除逻辑
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= deleteFileBatchMax) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
skipFileNum++;
//avoid deleting files in the middle
break;
}
}
}
//从文件映射队列中删除对应的文件映射
deleteExpiredFile(files);
//返回删除的文件个数
return deleteCount;
}
上面的代码有一些注释,实际也是检测MappedFile
是否需要删除,最终的删除逻辑在 mappedFile.destroy(intervalForcibly)
方法
我们进去看看
@Override
public boolean destroy(final long intervalForcibly) {
// 释放内存
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
long lastModified = getLastModifiedTimestamp();
// 关闭文件通道
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
// 删除文件
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime)
+ "," + (System.currentTimeMillis() - lastModified));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
cleanConsumeQueueService 清理queue
这里我们直接查看deleteExpiredFiles
方法,也是从最外层的this.cleanConsumeQueueService.run();
方法进入的
private void deleteExpiredFiles() {
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
// 获取commitlog 最小的偏移量
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
// 如果最小物理偏移量大于上次记录的最小物理偏移量 说明commitLog被删除过了
if (minOffset > this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset;
// 获取消费队列表
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable();
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
//删除过期文件,并返回删除的文件数量
int deleteCount = DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset);
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
//当上一个ConsumeQueue成功删除之后,下一个ConsumeQueue删除需要等待0.1s
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException ignored) {
// 忽略中断异常
}
}
}
}
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
这里最终的删除逻辑还是在deleteExpiredFile
中封在的
DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset);
实际和上面commitLog
删除类似,这里我们不过多深究MappedByteBuffer
这种偏底层的细节。主要是了解主流程,后面再单独聊聊一些有深度的细节
idnex删除
实际我们在queue里面删除逻辑中会看到这个么一个方法
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
没错这里面就是删除索引index的文件
public void deleteExpiredFile(long offset) {
Object[] files = null;
try {
// 获取读锁
this.readWriteLock.readLock().lock();
// 如果索引文件列表为空,则直接返回
if (this.indexFileList.isEmpty()) {
return;
}
// 获取第一个索引文件的结束物理偏移量
long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
// 如果第一个索引文件的结束物理偏移量小于commitLog的最小偏移量,说明索引无效需要删除
if (endPhyOffset < offset) {
// 将索引文件列表转换为数组
files = this.indexFileList.toArray();
}
} catch (Exception e) {
LOGGER.error("destroy exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
if (files != null) {
List<IndexFile> fileList = new ArrayList<>();
for (int i = 0; i < (files.length - 1); i++) {
IndexFile f = (IndexFile) files[i];
// 如果文件的结束物理偏移量小于commitlog的最小偏移量
if (f.getEndPhyOffset() < offset) {
fileList.add(f);
} else {
break;
}
}
// 调用删除过期文件的方法
this.deleteExpiredFile(fileList);
}
}
可以看到索引的删除文件也是通过commitlog的最小偏移量来删除的。只要一个索引中的最大偏移量小于commitlog
的最小偏移量,说明这个索引是不需要的就可以删除
correctLogicOffsetService
最后一个是更新逻辑偏移量,即consumeQueueStore
private void correctLogicMinOffset() {
long lastForeCorrectTimeCurRun = lastForceCorrectTime;
// 获取commitlog的最小偏移量
long minPhyOffset = getMinPhyOffset();
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable();
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
if (Objects.equals(CQType.SimpleCQ, logic.getCQType())) {
// cq is not supported for now.
continue;
}
// 如果需要纠正
if (needCorrect(logic, minPhyOffset, lastForeCorrectTimeCurRun)) {
// 执行纠正操作
doCorrect(logic, minPhyOffset);
}
}
}
}
- doCorrect
private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) {
// 删除逻辑队列中过期的文件
DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minPhyOffset);
int sleepIntervalWhenCorrectMinOffset = DefaultMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetSleepInterval();
if (sleepIntervalWhenCorrectMinOffset > 0) {
try {
Thread.sleep(sleepIntervalWhenCorrectMinOffset);
} catch (InterruptedException ignored) {
}
}
}
总结
总的来说RocketMQ执行文件清理操作主要是在设置的凌晨4点(4:00-5:00),要清理的文件也是我们常见的CommitLog
、consumequeue
、Index
. 清理的条件就是三个
- 磁盘超过设置的默认85%
- 到达设置的清理时间默认凌晨4点
- 手动执行清理操作
三个条件满足一个即可
至于一些文件操作比如MappedByteBuffer
等不在本次探讨范围内,限于篇幅我们这次是看整理的逻辑。后续会基于单个点深入分析