31.RocketMQ之消息拉取长轮询机制分析

消息拉取长轮询机制分析

RocketMQ未真正实现消息推模式,而是消费者主动向消息服务器拉取消息,RocketMQ推模式是循环向消息服务端发起消息拉取请求,如果消息消费者向RocketMQ拉取消息时,消息未到达消费队列。

消费者向 broker 拉取消息时,如果没有新消息到达消费队列。

如果没有启用长轮询机制,则会在服务端等待shortPollingTimeMills(默认1秒)时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取消息客户端PULL—NOT—FOUND(消息不存在);

如果开启长轮询模式,RocketMQ一方面会每隔5s轮询检查一次消息是否可达,同时一有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息,如果是则从CommitLog文件中提取消息返回给消息拉取客户端,否则直到挂起超时,超时时间由消息拉取方在消息拉取是封装在请求参数中,PUSH模式为15s即3次

PULL模式通过DefaultMQPullConsumer#BrokerSuspendMaxTimeMillis设置默认是20秒。RocketMQ通过在Broker客户端配置longPollingEnable为true来开启长轮询模式。默认就是开启。

长轮询、短轮询概述

消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker 端可以通过 longPollingEnable=true 来开启长轮询。

短轮询:longPollingEnable=false,第一次未拉取到消息后等待 shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1S。

长轮询:longPollingEnable=true,会根据消费者端设置的挂起超时时间,受DefaultMQPullConsumer 的brokerSuspendMaxTimeMillis控制,默认20s,(brokerSuspendMaxTimeMillis),长轮询有两个线程来相互实现。

PullRequestHoldService:每隔5s重试一次。 DefaultMessageStore#ReputMessageService,每当有消息到达后,转发消息,然后调用PullRequestHoldService 线程中的拉取任务,尝试拉取,每处理一次,Thread.sleep(1), 继续下一次检查。

普通轮询和长轮询区别

  • 普通轮询比较简单,就是定时发起请求,服务端收到请求后不论数据有没有更新都立即返回

    优点就是实现简单,容易理解。 缺点就是服务端是被动的,服务端要不断的处理客户端连接,并且服务端无法控制客户端pull的频率以及客户端数量.

  • 长轮询是对普通轮询的优化,依然由客户端发起请求,服务端收到后并不立即响应而是hold住客户端连接,等待新消息产生后(或者超过指定时间15秒还未产生新消息)才回复客户端。

    说白了,就是对普通轮询加了个控制,你客户端可以随时请求我,但是回不回复我说了算,这就保证了服务端不会被客户端带节奏,导致自己的压力不可控。

    在 RocketMq 中消费者主动发起pull请求,broker在处理消息拉取请求时,如果没有查询到消息,将不返回消费者任何信息,而是先hold住并且挂起请求,使其不会立即发起下一次拉取请求,会将请求信息pullRequest添加到pullRequestTable中,等待触发通知消费者的事件。

当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户。 ​

如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE。

然后在Broker端,通过后台独立线程PullRequestHoldService遍历所有挂起的请求pullRequestTable,如果有消息,则返回响应给消费者。

同时,另外一个ReputMessageService线程不断地构建ConsumeQueue/IndexFile数据,不断的检测是否有新消息产生,如果有新消息,则从pullRequestTable通过Topic+queueId的key获取对应hold住的请求pullRequest,再根据其中的长链接channel进行通信响应。

通过这种长轮询机制,即可解决Consumer端需要通过不断地发送无效的轮询Pull请求,而导致整个RocketMQ集群中Broker端负载很高的问题。

RocketMQ 的长轮询机制由 2 个线程共同完成。PullRequestHoldService、ReputMessageService。

1.Push推送方式(即Server端推送消息给client):
当Server收到消息发送者发送过来的消息后,Server端主动把消息推送给client,这个方式实时性比较好。
消息实时,保持长链接,不会频繁建立链接。但是增加了Server的工作负担,对Server的性能造成影响。
Client如果不能够及时处理Server推送的消息,也是很大的问题。如果消息数量过大,消费者吞吐量小,可能会造成消费者缓冲区溢出。

2.Pull拉取方式(即Client从Server拉取消息):
Client循环的从Server拉取消息,由client控制着主动权。
弊端:
拉取消息的时间间隔不好设定,间隔太短循环空拉取造成资源浪费。
间隔时间太长,就会增加消息消费的延迟,影响业务使用。
消费进度offset需要consumer自己来维护,代码比较麻烦。



3.长轮询的消费方式
RocketMQ的消息消费方式,采用了“长轮询”方式,兼具了Push和Pull的有点,不过需要Server和Client的配合才能够实现。
即Client发送消息请求,Server端接受请求,如果发现Server队列里没有新消息,Server端不立即返回,而是持有这个请求一段时间(通过设置超时时间来实现),在这段时间内轮询Server队列内是否有新的消息,如果有新消息,就利用现有的连接返回消息给消费者;如果这段时间内没有新消息进入队列,则返回空。

这样消费消息的主动权既保留在Client端,也不会出现Server积压大量消息后,短时间内推送给Client大量消息使client因为性能问题出现消费不及时的情况。


长轮询的弊端:在持有消费者请求的这段时间,占用了系统资源,因此长轮询适合客户端连接数可控的业务场景中。
consumer 拉取消息,对应的 queue 如果没有数据,broker 不会立即返回,而是以一种长轮询的方式处理,把 PullReuqest 保存起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。

处理消费者拉取消息的请求。

主动检查

PullMessageProcessor#processRequest

//当没有拉取到消息时,通过长轮询方式继续拉取消息
case ResponseCode.PULL_NOT_FOUND:
    //默认都是true 允许长轮询和挂起
    if (brokerAllowSuspend && hasSuspendFlag) {
        //suspendTimeoutMillisLong 是 15秒
        long pollingTimeMills = suspendTimeoutMillisLong;
        //判断长轮询如果没开启 那么使用1秒作为挂起时间
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController
                				.getBrokerConfig().getShortPollingTimeMills();
        }


        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        //构建拉取请求对象
        PullRequest pullRequest 
            	= new PullRequest(request, channel, pollingTimeMills,
            			this.brokerController.getMessageStore().now(), 
             				offset, subscriptionData, messageFilter);
        //暂停和检查拉取请求
        this.brokerController.getPullRequestHoldService()
            		.suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }

一共处理了四个类型,我们关心的是在没有获取到数据的情况下是如何处理的,可以重点看一下ResponseCode.PULL_NOT_FOUND,表示没有拉取到数据,此时会调用PullRequestHoldService服务,从名字可以看出此服务用来hold住请求,不会立马返回,response被置为了null,不给客户端响应。

PullRequestHoldService#suspendPullRequest

该方法是将需要hold处理的PullRequest放入到一个ConcurrentHashMap中,等待被检查;

//将拉取消息请求,放置在ManyPullRequest集合中
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    mpr.addPullRequest(pullRequest);
}

下面重点看一下PullRequestHoldService.

PullRequestHoldService#run

此方法主要就是通过不停的检查被hold住的请求,检查是否已经有数据了,检查的对象是在ResponseCode.PULL_NOT_FOUND中调用的suspendPullRequest方法放入到ConcurrentHashMap中的需要hold处理的PullRequest。

public void run() {

    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            //如果开启长轮询每隔5秒判断消息是否到达
            //await 5秒
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                //没有开启长轮询,隔1s再次尝试
                //await 1秒
              this.waitForRunning(this.brokerController
                   					  .getBrokerConfig()
                                  	  .getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            //检查挂起的请求
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info("{} service end", this.getServiceName());
}

具体的检查代码在checkHoldRequest中

PullRequestHoldService#checkHoldRequest

此方法用来获取指定messageQueue下最大的offset,然后用来和当前的offset来比较,来确定是否有新的消息到来。

//遍历拉取任务
private void checkHoldRequest() {
    for (String key : this.pullRequestTable.keySet()) {
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            //获得最大的消息偏移量
            final long offset = this.brokerController.getMessageStore()
                					.getMaxOffsetInQueue(topic, queueId);
            try {
                //通知有消息达到
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
            }
        }
    }
}

往下看notifyMessageArriving方法:

PullRequestHoldService#notifyMessageArriving

方法中两个重要的判定就是:

1.比较当前的offset和maxoffset,看是否有新的消息到来,有新的消息返回客户端;

2.另外一个就是比较当前的时间和阻塞的时间,看是否超过了最大的阻塞时间,超过也同样返回;

此方法不光在PullRequestHoldService服务类中循环调用检查,同时在DefaultMessageStore中消息被存储的时候调用;

其实就是主动检查和被动通知两种方式。

//如果拉取消息偏移量大于请求偏移量,如果消息匹配调用executeRequestWhenWakeup处理消息
if (newestOffset > request.getPullFromThisOffset()) {
    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
    //判断是否匹配
    if (match && properties != null) {
        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
    }
	//如果匹配
    if (match) {
        try {
            //处理完之后不会挂起
            this.brokerController.getPullMessageProcessor()
                	.executeRequestWhenWakeup(request.getClientChannel(),
                request.getRequestCommand());
        } catch (Throwable e) {
            log.error("execute request when wakeup failed.", e);
        }
        continue;
    }
}
//当前时间>=15秒 + 创建pullRequest的时间 说明已经到达挂起最大时间
//如果已经到达挂起最大时间,则不继续等待将直接返回给客户端消息未找到
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
    try {
        this.brokerController.getPullMessageProcessor()
            			.executeRequestWhenWakeup(request.getClientChannel(),
            								request.getRequestCommand());
    } catch (Throwable e) {
        log.error("execute request when wakeup failed.", e);
    }
    continue;
}

PullMessageProcessor#executeRequestWhenWakeup

这里的核心亮点是:在调用 PullMessageProcessor.this.processRequest(channel, request, false) 方法是,最后一个参数是 false,表示 broker 端不支持挂起,这样在 PullMessageProcessor 方法中,如果没有查找消息,也不会继续再挂起了,因为进入这个方法时,拉取的偏移量是小于队列的最大偏移量,正常情况下是可以拉取到消息的。

public void executeRequestWhenWakeup(final Channel channel,
        final RemotingCommand request) throws RemotingCommandException {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);

                    if (response != null) {
                        response.setOpaque(request.getOpaque());
                        response.markResponseType();
                        try {
                            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (!future.isSuccess()) {
                                        log.error("processRequestWrapper response to {} failed",
                                            future.channel().remoteAddress(), future.cause());
                                        log.error(request.toString());
                                        log.error(response.toString());
                                    }
                                }
                            });
                        } catch (Throwable e) {
                            log.error("processRequestWrapper process request over, but response failed", e);
                            log.error(request.toString());
                            log.error(response.toString());
                        }
                    }
                } catch (RemotingCommandException e1) {
                    log.error("excuteRequestWhenWakeup run", e1);
                }
            }
        };
        this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
    }

客户端的回调

 //消费端拉取的回调
 public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);


                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);

                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }

                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }

                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());


                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);

                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }

            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue()
                	.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }
				//放入pullRequestQueue
                //this.pullRequestQueue.put(pullRequest);
                DefaultMQPushConsumerImpl.this
                		.executePullRequestLater
                			(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }

总结

消费者拉取消息的时候,如果没有拉取到消息。需要hold住拉取消息的请求,放入1个map中。每5秒检查这些 放在map中的拉取消息的请求。如果有新消息到达,判断新消息是否是匹配的消息,如果匹配立刻返回。一共会检查3次,也就是15秒没有匹配的消息到达就立刻返回。

消息拉取是异步的。服务端处理完之后,给客户端响应,客户端会回调其中的PullCallback,其中在处理完消息之后,重要的一步就是再次把pullRequest放到PullMessageService服务中,等待下一次的轮询。

监听器被动通知

DefaultMessageStore#start

如果开启了长轮询机制,PullRequestHoldService会每隔5s被唤醒去尝试检测是否有新的消息的到来才给客户端响应,或者直到超时才给客户端进行响应,消息实时性比较差,为了避免这种情况,RocketMQ引入另外一种机制:当消息到达时唤醒挂起线程触发一次检查。使用的是监听器!

时机:在新消息到达时需要对新消息进行索引的建立。

//长轮询入口
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();

DefaultMessageStore$ReputMessageService#run

public void run() {

    DefaultMessageStore.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            Thread.sleep(1);
            //长轮询核心逻辑代码入口
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn
                	(this.getServiceName() + " service has exception. ", e);
        }
    }


    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

DefaultMessageStore$ReputMessageService#deReput

private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
      	if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
             break;
         }
         SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);   // @1
         if (result != null) {
         	try {
                        this.reputFromOffset = result.getStartOffset();
                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {  //@2
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); //@3
                            int size = dispatchRequest.getMsgSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);  // @4


                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {                                              // @5
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }

                                    this.reputFromOffset += size;
                                    readSize += size;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                            .addAndGet(dispatchRequest.getMsgSize());
                                    }
                                } else if (size == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {

                                if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                } else {
                                    doNext = false;
                                    if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                        log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                            this.reputFromOffset);


                                        this.reputFromOffset += result.getSize() - readSize;
                                    }
                                }
                            }
                        }
                } finally {
                     result.release();
                }
          } else {
                doNext = false;
          }
    }
}

代码@1:根据偏移量读取偏移量+到 commitlog文件中有效数据的最大偏移量。如果未找到数据,结束doReput 方法。 代码@2:循环从 SelectMappedBufferResult 中读取消息,每次读取一条。 代码@3:从 SelectMappedBufferResult 中读取一条消息,生成 DispatchRequest 对象。 代码@4:根据 comitlog 文件内容实时构建 consumequeue、index文件的关键所在,该部分详情请参考:blog.csdn.net/prestigedin… 代码@5:如果开启了长轮询并且角色为主节点,则通知有新消息到达,执行一次 pullRequest 验证。 NotifyMessageArrivingListener 代码,最终调用 pullRequestHoldService 的 notifyMessageArriving 方法,进行一次消息拉取。

只要待拉取偏移量小于消息消费队列的最大偏移量,既可以被唤醒进行消息拉取。

NotifyMessageArrivingListener#arriving

public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    //挂起拉取消息的线程
    this.pullRequestHoldService.notifyMessageArriving
        	(topic, queueId, logicOffset, tagsCode,
        			msgStoreTime, filterBitMap, properties);
}

总结

1.consumer发送request,broker通过网络层拿到request后放入PullMessageProcessor执行。执行内容:1.做权限验证;2.通过commitlog && consumerQueue获取message,若获取结果是SUCCESS后,通过channel.writeAndFlush写回到consumer端。若获取结果是PULL_NO_FOUND,把该请求hold住,把PullRequest包装成PR参数,放入PullRequestHoldService的pullRequestTable(key:topic@queueid,value:list),而暂时不给consumer端响应。

2.producer发送数据,被网络层接收后由sendMessageProcessor处理,放入commitlog,无论是放入堆外内存或是pagecache,commitlog的maxOffset会变动,会触发reputMessageService,该service把commitlog中数据dispatch至commitQueue中,通知执行messageArrivingListener.arrving来通知PullRequestHoldService

3.PullRequestHoldService收到arrvintg通知,把之前hold住的pullRequest开始执行。

若是判断出消息已发送过,PullMessageHoldService会把pullRequest请求异步由PullMessageProcessor执行,且brokerAllowSuspend=false,表示不允许broker再次suspend。

若是pullRequest被hold的时间太久,也会把pullRequest请求异步由PullMessageProcessor执行,且把brokerAllowSuspend=false。

4.PullMessageProcessor针对再次放入的pullRequest,做权限验证,获取message,若判断是消息已发出,会返回SUCCESS;若是超时,可能仍会是PULL_NOT_FOUND,都会调用channel.writeAndFlush,写回给consumer端。

问题:PullRequestHoldService.suspendPullRequest,想要hold住的那条消息还未放入PullRequestTable中,但producer发送的这条消息已经过来,并开始通知了,hold住的那条消息会一直存在 PullRequestTable中,mq是如何处理?

request被hold的时间太久,强制唤醒,返回consumer端null,再次拉请求发送到broker,若仍是not found的话,则新一轮的hold住

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYQadJd7' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片