1、网络 IO 模型
IO 多路复用常见的有 select, poll, epoll。
在 netty 中,要使用 epollo,需要使用 EpollEventLoopGroup
, 要使用 select,需要使用 NioEventLoopGroup
。
下面,来看下 dubbo 在创建 EventLoopGroup
时,使用的是哪个类.
netty 在创建 EventLoopGroup
都是通过 NettyEventLoopFactory#eventLoopGroup(int, String)
去创建。
- 默认不开启 epollo
- 当
netty.epoll.enable
为 true 且操作系统为 linux 时, epollo 才生效
private static boolean shouldEpoll() {
Configuration configuration = ApplicationModel.getEnvironment().getConfiguration();
if (configuration.getBoolean("netty.epoll.enable", false)) {
String osName = configuration.getString("os.name");
return osName.toLowerCase().contains("linux") && Epoll.isAvailable();
}
return false;
}
为什么默认不使用 epollo?
个人看法:一般而言在连接不多的情况下,select 会比 epollo 更高效。而对于大部分应用程序,不会有太多的节点,因此使用 select 比 epollo 更合理。
1.1、业务线程设计
netty 的网络模型使用的是 reactor 模式。在 reactor 模式中,将线程分为了 bossGroup、workGroup、业务线程。
bossGroup 用于处理连接。建立连接后,将连接交给 workGroup, workGroup 负责读写。读写处理完后,再将消息转发给业务线程。
在 dubbo 中,业务逻辑处理使用的是业务线程。编解码消息体默认使用 workGroup 线程,可通过 decode.in.io
配置进行修改。
那么在 dubbo 中,工作线程长什么样呢?
dubbo 允许用户通过 SPI 机制修改工作线程。默认提供的工作线程有如下:
- CachedThreadPool(cached)
线程池模型为Executors.newCachedThreadPool()
- EagerThreadPool(ager)
核心线程数满时,创建新的线程去执行任务,而不是将任务放到阻塞队列中 - FixedThreadPool(默认-fixed)
线程池模型为Executors.newFixedThreadPool()
- LimitedThreadPool(limited)
不会收缩的线程池
可以通过 dubbo.protocol.threadpool
修改默认行为
2、任务派发
我们知道 reactor 有分 IO 线程、工作线程。如果派发到工作线程,则还需要将任务提交给工作线程。如果在 IO 线程执行,则免去这一步骤, 效率更高。因为接收消息的就是 IO 线程。
dubbo 提供了以下几种分发策略
- all 所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳等。(默认)
direct
所有消息都不派发到业务线程池,全部在 workGroup 线程上直接执行。message
只有请求响应消息派发到业务线程池,其它连接断开事件,心跳等消息,直接在 workGroup 线程上执行。execution
只有请求消息派发到业务线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 workGroup 线程上执行。connection
在 workGroup 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到业务线程池。
3、粘包、半包处理
代码入口:InternalDecoder#decode()
粘包、半包的处理原则是:不停的读取数据流,直到数据流成为一个完整的包。至于如何判断是否为完整的包,则由程序决定。RPC 中,常用的是 固定的消息头大小 + 可变的消息体。并在消息头在声明消息体的大小。dubbo 采用的就是这种方式。
4、心跳机制
dubbo 没有重新设计心跳机制,而是使用 netty 自带的心跳机制。
4.1、client
功能为:客户端如果 60s 没有发送数据包,则发送一次心跳包。每次发完心跳包,都会检查连接是否已经失效。如果是,则关闭连接,并从本地缓存中删掉该连接
代码位置 NettyClient#doOpen()
ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(60000, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
具体的处理机制, 代码位置 NettyClientHandler
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// send heartbeat when read idle.
if (evt instanceof IdleStateEvent) {
try {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
if (logger.isDebugEnabled()) {
logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel);
}
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(HEARTBEAT_EVENT);
channel.send(req);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
4.2、server
功能为:如果 180s 没有任何的收发数据包,则关闭连接
代码位置 NettyServer#doOpen
ch.pipeline()
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
代码位置 NettyServerHandler
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// server will close channel when server don't receive any heartbeat from client util timeout.
if (evt instanceof IdleStateEvent) {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
logger.info("IdleStateEvent triggered, close channel " + channel);
channel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
super.userEventTriggered(ctx, evt);
}
5、client 重连机制
client 通过定时任务方式,定时检查 client 是否需要重连.
5.1、创建重连定时任务
代码入口:HeaderExchangeClient(Client, boolean)
默认每隔 180s 检测一次。即,心跳时间的 3 倍。
需要注意,大部分框架,都会将判断应用是否死亡时间,设置为心跳时间的 3倍。主要的原因是:可能出于网络原因,导致心跳包没有接收到。因此给了 client 2 次重试的机会。
5.2、是否触发重连判断
代码入口:ReconnectTimerTask#doTask(Channel)
如果应用失去连接,则直接重连,否则判断上次读的时间是否超过了空闲时间(180s),如果是也触发重连。
需要注意的是,在网络编程中,应用可连接不代表应用是存活正常的,程序有可能出于假死状态。因此需要触发重连。
6、总结
dubbo 对网络编程做了高度的封装,本质还是不离网络编程,例如 粘包、半包、协议设计、心跳、重连等网络开发中会遇到的常见问题。