23.RocketMQ之NameServer处理Broker心跳包,更新本地路由信息

NameServer处理Broker心跳包,更新本地路由信息

DefaultRequestProcessor继承自NettyRequestProcessor:处理各种客户端的请求,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker,主要是服务器端 或者客户端或者broker发送心跳,看下如何处理请求类型是为REGISTER_BROKER的请求吧

DefaultRequestProcessor#processRequest

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
​
    if (ctx != null) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }
​
    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        //注册broker
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = 
                    MQVersion.value2Version(request.getVersion());
           if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        //取消注册broker    
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        //拉取路由元信息
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
            //更新nameConfig
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
            //拉取nameConfig
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

注册broker,维护topic和队列信息

DefaultRequestProcessor#registerBroker

public RemotingCommand registerBroker(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    final RemotingCommand response 
                    = RemotingCommand.createResponseCommand
                                (RegisterBrokerResponseHeader.class);



    final RegisterBrokerResponseHeader responseHeader
                = (RegisterBrokerResponseHeader)response.readCustomHeader();

    final RegisterBrokerRequestHeader requestHeader
                = (RegisterBrokerRequestHeader)request
                            .decodeCommandCustomHeader          
                                (RegisterBrokerRequestHeader.class);


​
    if (!checksum(ctx, request, requestHeader)) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("crc32 not match");
        return response;
    }
    TopicConfigSerializeWrapper topicConfigWrapper;
    if (request.getBody() != null) {
        topicConfigWrapper = TopicConfigSerializeWrapper
            .decode(request.getBody(), TopicConfigSerializeWrapper.class);
    } else {
        topicConfigWrapper = new TopicConfigSerializeWrapper();
        topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
        topicConfigWrapper.getDataVersion().setTimestamp(0);
    }

    //注册broker 维护路由信息
    RegisterBrokerResult result = this.namesrvController
                                    .getRouteInfoManager()
                                    .registerBroker(
                                        requestHeader.getClusterName(),
                                        requestHeader.getBrokerAddr(),
                                        requestHeader.getBrokerName(),
                                        requestHeader.getBrokerId(),
                                        requestHeader.getHaServerAddr(),
                                        topicConfigWrapper,
                                        null,
                                        ctx.channel());

    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());
    byte[] jsonValue = this.namesrvController
                            .getKvConfigManager()
                            .getKVListByNamespace
                            (NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            //加锁
            this.lock.writeLock().lockInterruptibly();
            //维护clusterAddrTable
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            //将当前broker添加到集合中
            brokerNames.add(brokerName);

            boolean registerFirst = false;
            //维护brokerAddrTable
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData
                        (clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            //非第一次注册,更新Broker
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();

            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                    it.remove();
                }
            }
​
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);
            //维护topicQueueTable
            if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged
                            (brokerAddr, topicConfigWrapper.getDataVersion()) 
                    || registerFirst) {

                    ConcurrentMap<String, TopicConfig> tcTable = 
                                topicConfigWrapper.getTopicConfigTable();

                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            //HashMap<String/* topic */, List<QueueData>> topicQueueTable;
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }
            //维护brokerLiveTable
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }
            //维护filterServerList
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }
​
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }
​
    return result;
}
RouteInfoManager#createAndUpdateQueueData
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
   // HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    //创建QueueData QueueData就是topicQueueTable对应的List中的元素
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
    //获得topicQueueTable中队列集合
    List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
    //topicQueueTable为空,则直接创建一个list并将queueData添加到队列集合
    if (null == queueDataList) {
        queueDataList = new LinkedList<QueueData>();
        queueDataList.add(queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
        log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
    } else {
        //判断是否是新的队列
        boolean addNewOne = true;
        Iterator<QueueData> it = queueDataList.iterator();
        while (it.hasNext()) {
                QueueData qd = it.next();

                //假如原来topicA对应的是braoker-a broker-b
                //现在新增broker-c
                //那么qd.getBrokerName().equals(brokerName) 肯定是不相等的
                //这样才能加入queueDataList

                //如果brokerName相同,代表不是新的队列
                if (qd.getBrokerName().equals(brokerName)) {
                        if (qd.equals(queueData)) {
                            addNewOne = false;
                        } else {
                            it.remove();
                        }
                    }
            }
        //如果是新的队列,则添加队列到queueDataList
        if (addNewOne) {
            queueDataList.add(queueData);
        }
    }
}

image.png

生产者&消费者如何定时拉取路由元信息?

是在哪里请求的,多久请求一次?

RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由.客户端指的是消费者和生产者。

发起getRouteInfoByTopic请求的是MQClientInstance。生产者和消费者都会启动MQClientInstance。

MQClientInstance会启动1个定时任务每30秒拉一次最新的路由信息。

此时会由DefaultRequestProcessor处理这些请求。请求的key是GET_ROUTEINFO_BY_TOPIC.

具体代码如下:

 case RequestCode.GET_ROUTEINFO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);

DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
    // 调用RouteInfoManager的方法
    // 从路由表topicQueueTable、brokerAddrTable、filterServerTable中
    // 分别填充TopicRouteData的List<QueueData>、List<BrokerData>、filterServer
    TopicRouteData topicRouteData
                    = this.namesrvController
                        .getRouteInfoManager()
                        .pickupTopicRouteData(requestHeader.getTopic());
    //如果找到主题对应你的路由信息并且该主题为顺序消息
    //则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息
    if (topicRouteData != null) {
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController
                        .getKvConfigManager().getKVConfig
                                (NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                                                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
​
        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
​
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark
       ("No topic route info in name server for the topic: " +  
         requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYiI9Lti' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片