【从0-1 千万级直播项目实战】线上发布时RPC调用经常404/503,被运营骂惨了

背景

在每次的开发迭代中,每次的改动点可能都涉及到多个服务,每次改动发版都需要重新发布,在发布过程中有用户使用必然会出现一定的404,503,rpc调用timeOut问题,就在某一天修复线上问题一天发布了N次,用户向运营投诉了,说怎么我们App经常报网络异常,经过排查得知,发布不够丝滑造成,具体表现如下

image.png

image.png

image.png

原因分析

上面我为什么要放三张图,是有深意的,因为这三张图,代表着我们要分析的三个表现层面,分别是Gateway,RPC,MQ

这里再补一张我们的系统调用链路图

image.png

由图可知,系统调用链路要么是经过网关转发-业务服务-RPC调用,要么是长连接-RPC调用,RPC之间又分别有多个调用,这时有人可能会问了,你这个RPC相互调用应该在设计中去规避,没错,我们的设计已经尽量规避了RPC调用链路过长和过度依赖RPC的问题,但对于一些query操作,很难完全避免,就比如一个用户想要获取自己的礼物背包,他的链路会是:

Gateway服务→gift服务(查询用户礼物背包)→user服务(获取用户信息)→base服务(获取翻译相关信息)

当时的分层/服务概念秉承的原则是职责划分与并发量划分,现在看来有利也有弊吧。

继续分析可能的异常原因:

  1. 发布过程中网关转发时找不到服务了,调用异常
  2. 发布过程中有正在执行的RPC请求/已经在线程池中提交了执行任务的RPC请求,在执行过程中发现RPC服务调用实例不存在了
  3. MQ、RPC、Nacos、SpringBoot容器没有平滑关闭?

问题复现

遇到问题,先别慌,多思考和动手实践,首先咱们先在本地/测试服模拟复现一下线上的场景

  • 写一个测试接口

image.png

  • 启动网关和多个测试接口的服务实例

image.png

  • 使用ApiFox创建一个多线程并发调用的用例来模拟用户使用过程中通过网关不间断的调用该测试接口

image.png

  • 查看调用结果

image.png

image.png

此时多个服务实例都在正常运行

  • 关掉其中一个服务实例,模拟滚动发布过程

image.png

发现马上抛出了异常,大概10秒之后又恢复了正常

  • 结论

服务调用时,从Nacos拉取的服务实例没有刷新,这里并不是指Nacos没有及时刷新,Nacos服务实时刷新问题我在以前微服务滚动发布方案文章中已经解决过,这指的是存在于每一个调用方的实例中拉取的Nacos服务实例在本地缓存中没有更新

解决方案

实例下线通知所有服务刷新负载均衡缓存

操作流程图

image.png

代码实现

我们使用的是SpringCloud LoadBalancer,查阅源码可知,使用了DefaultLoadBalancerCacheManager作为负载均衡缓存管理,源码如下

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.springframework.cloud.loadbalancer.cache;
import com.stoyanr.evictor.map.ConcurrentHashMapWithTimedEviction;
import com.stoyanr.evictor.scheduler.DelayedTaskEvictionScheduler;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.springframework.cache.Cache;
import org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier;
import org.springframework.lang.Nullable;
public class DefaultLoadBalancerCacheManager implements LoadBalancerCacheManager {
private final ConcurrentMap<String, Cache> cacheMap;
public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties, String... cacheNames) {
this.cacheMap = new ConcurrentHashMap(16);
this.cacheMap.putAll((Map)this.createCaches(cacheNames, loadBalancerCacheProperties).stream().collect(Collectors.toMap(DefaultLoadBalancerCache::getName, (cache) -> {
return cache;
})));
}
public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties) {
this(loadBalancerCacheProperties, CachingServiceInstanceListSupplier.SERVICE_INSTANCE_CACHE_NAME);
}
private Set<DefaultLoadBalancerCache> createCaches(String[] cacheNames, LoadBalancerCacheProperties loadBalancerCacheProperties) {
return (Set)Arrays.stream(cacheNames).distinct().map((name) -> {
return new DefaultLoadBalancerCache(name, new ConcurrentHashMapWithTimedEviction(loadBalancerCacheProperties.getCapacity(), new DelayedTaskEvictionScheduler(this.aScheduledDaemonThreadExecutor())), loadBalancerCacheProperties.getTtl().toMillis(), false);
}).collect(Collectors.toSet());
}
private ScheduledExecutorService aScheduledDaemonThreadExecutor() {
return Executors.newSingleThreadScheduledExecutor((runnable) -> {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setDaemon(true);
return thread;
});
}
@Nullable
public Cache getCache(String name) {
return (Cache)this.cacheMap.get(name);
}
public Collection<String> getCacheNames() {
return Collections.unmodifiableSet(this.cacheMap.keySet());
}
}
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//


package org.springframework.cloud.loadbalancer.cache;

import com.stoyanr.evictor.map.ConcurrentHashMapWithTimedEviction;
import com.stoyanr.evictor.scheduler.DelayedTaskEvictionScheduler;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.springframework.cache.Cache;
import org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier;
import org.springframework.lang.Nullable;

public class DefaultLoadBalancerCacheManager implements LoadBalancerCacheManager {
    private final ConcurrentMap<String, Cache> cacheMap;

    public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties, String... cacheNames) {
        this.cacheMap = new ConcurrentHashMap(16);
        this.cacheMap.putAll((Map)this.createCaches(cacheNames, loadBalancerCacheProperties).stream().collect(Collectors.toMap(DefaultLoadBalancerCache::getName, (cache) -> {
            return cache;
        })));
    }

    public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties) {
        this(loadBalancerCacheProperties, CachingServiceInstanceListSupplier.SERVICE_INSTANCE_CACHE_NAME);
    }

    private Set<DefaultLoadBalancerCache> createCaches(String[] cacheNames, LoadBalancerCacheProperties loadBalancerCacheProperties) {
        return (Set)Arrays.stream(cacheNames).distinct().map((name) -> {
            return new DefaultLoadBalancerCache(name, new ConcurrentHashMapWithTimedEviction(loadBalancerCacheProperties.getCapacity(), new DelayedTaskEvictionScheduler(this.aScheduledDaemonThreadExecutor())), loadBalancerCacheProperties.getTtl().toMillis(), false);
        }).collect(Collectors.toSet());
    }

    private ScheduledExecutorService aScheduledDaemonThreadExecutor() {
        return Executors.newSingleThreadScheduledExecutor((runnable) -> {
            Thread thread = Executors.defaultThreadFactory().newThread(runnable);
            thread.setDaemon(true);
            return thread;
        });
    }

    @Nullable
    public Cache getCache(String name) {
        return (Cache)this.cacheMap.get(name);
    }

    public Collection<String> getCacheNames() {
        return Collections.unmodifiableSet(this.cacheMap.keySet());
    }
}
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.cloud.loadbalancer.cache; import com.stoyanr.evictor.map.ConcurrentHashMapWithTimedEviction; import com.stoyanr.evictor.scheduler.DelayedTaskEvictionScheduler; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import org.springframework.cache.Cache; import org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier; import org.springframework.lang.Nullable; public class DefaultLoadBalancerCacheManager implements LoadBalancerCacheManager { private final ConcurrentMap<String, Cache> cacheMap; public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties, String... cacheNames) { this.cacheMap = new ConcurrentHashMap(16); this.cacheMap.putAll((Map)this.createCaches(cacheNames, loadBalancerCacheProperties).stream().collect(Collectors.toMap(DefaultLoadBalancerCache::getName, (cache) -> { return cache; }))); } public DefaultLoadBalancerCacheManager(LoadBalancerCacheProperties loadBalancerCacheProperties) { this(loadBalancerCacheProperties, CachingServiceInstanceListSupplier.SERVICE_INSTANCE_CACHE_NAME); } private Set<DefaultLoadBalancerCache> createCaches(String[] cacheNames, LoadBalancerCacheProperties loadBalancerCacheProperties) { return (Set)Arrays.stream(cacheNames).distinct().map((name) -> { return new DefaultLoadBalancerCache(name, new ConcurrentHashMapWithTimedEviction(loadBalancerCacheProperties.getCapacity(), new DelayedTaskEvictionScheduler(this.aScheduledDaemonThreadExecutor())), loadBalancerCacheProperties.getTtl().toMillis(), false); }).collect(Collectors.toSet()); } private ScheduledExecutorService aScheduledDaemonThreadExecutor() { return Executors.newSingleThreadScheduledExecutor((runnable) -> { Thread thread = Executors.defaultThreadFactory().newThread(runnable); thread.setDaemon(true); return thread; }); } @Nullable public Cache getCache(String name) { return (Cache)this.cacheMap.get(name); } public Collection<String> getCacheNames() { return Collections.unmodifiableSet(this.cacheMap.keySet()); } }

ok 那我们就好办了,拿到对应的缓存对象,刷新对应实例的缓存Key就好了

监听实例下线事件后,通过MQ发送广播消息通知

image.png

监听广播消息后处理

@Slf4j(topic = "TOPIC_COMMON")
@Service
@MqCommonGroupConsumeEvent(event = RocketMqBizConstant.Common.Broadcast.NACOS_INSTANCE_CHANGE)
public class NacosInstanceChangeHandleMessage extends AbstractCommonGroupMqHandler {
@Resource
private CacheManager defaultLoadBalancerCacheManager;
@Override
public void handleMessage(String message) {
log.info("Nacos实例上下线事件处理 | {}", message);
NacosInstanceChangeMqDto dto = GsonUtil.GsonToBean(message, NacosInstanceChangeMqDto.class);
Cache cache = defaultLoadBalancerCacheManager.getCache("CachingServiceInstanceListSupplierCache");
if (cache != null) {
cache.evict(dto.getServiceName());
log.info("{}实例负载均衡缓存刷新完成", dto.getServiceName());
}
}
}
@Slf4j(topic = "TOPIC_COMMON")
@Service
@MqCommonGroupConsumeEvent(event = RocketMqBizConstant.Common.Broadcast.NACOS_INSTANCE_CHANGE)
public class NacosInstanceChangeHandleMessage extends AbstractCommonGroupMqHandler {


    @Resource
    private CacheManager defaultLoadBalancerCacheManager;
    @Override
    public void handleMessage(String message) {
        log.info("Nacos实例上下线事件处理 | {}", message);
        NacosInstanceChangeMqDto dto = GsonUtil.GsonToBean(message, NacosInstanceChangeMqDto.class);
        
        Cache cache = defaultLoadBalancerCacheManager.getCache("CachingServiceInstanceListSupplierCache");
        if (cache != null) {
            cache.evict(dto.getServiceName());
            log.info("{}实例负载均衡缓存刷新完成", dto.getServiceName());
        }
    }
}
@Slf4j(topic = "TOPIC_COMMON") @Service @MqCommonGroupConsumeEvent(event = RocketMqBizConstant.Common.Broadcast.NACOS_INSTANCE_CHANGE) public class NacosInstanceChangeHandleMessage extends AbstractCommonGroupMqHandler { @Resource private CacheManager defaultLoadBalancerCacheManager; @Override public void handleMessage(String message) { log.info("Nacos实例上下线事件处理 | {}", message); NacosInstanceChangeMqDto dto = GsonUtil.GsonToBean(message, NacosInstanceChangeMqDto.class); Cache cache = defaultLoadBalancerCacheManager.getCache("CachingServiceInstanceListSupplierCache"); if (cache != null) { cache.evict(dto.getServiceName()); log.info("{}实例负载均衡缓存刷新完成", dto.getServiceName()); } } }

这里要注意一个,你得看你项目使用的负载均衡器是什么,找到负载均衡器 像我们这边使用的就是默认的CachingServiceInstanceListSupplier

然后 执行cache.evict(dto.getServiceName()); 清除对应下线服务的Cache,至此此优化操作已完成

SpringBoot2.3之后的版本别忘了加上这个配置

spring:
lifecycle:
timeout-per-shutdown-phase: 20s
spring:
  lifecycle:
    timeout-per-shutdown-phase: 20s
spring: lifecycle: timeout-per-shutdown-phase: 20s

最多等待20s,在这个期间不会接受新的请求,但仍会处理正在进行中的请求,如果处理完毕,将关闭(如果没有处理完毕,也会关闭)

优化结果

  • [K8s滚动发布时不再出现404/503/调用异常的错误]
  • [不再出现发布过程中MQ/RPC接口未正常执行完成就强制退出的情况]
  • [99.99%的线上用户不再受发布过程中系统网络异常的Toast影响]

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYpnkm5l' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片