handler基本说明
Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等
其中ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。
例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。
ChannelPipeline提供了ChannelHandler链的容器
以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的。同理,对于服务端来说,也是一样的,有数据来就是入站,有数据输出就是出站。
为什么服务端和客户端的handler都是继承SimpleChannelInboundHandler,而没有ChannelOutboundHandler出站类?
实际上当我们在handler中调用ctx.writeAndFlush()方法后,就会将数据交给ChannelOutboundHandler进行出站处理,只是我们没有去定义出站类而已,若有需求可以自己去实现ChannelOutboundHandler出站类。
总结就是客户端和服务端都有出站和入站的操作,形成一个循环链
服务端-------->出站-------->入站-------->客户端 -------->出站-------->入站-------->服务端
编解码器
当Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。
Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler
或者ChannelOutboundHandler
接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。
解码器
ByteToMessageDecoder decode()
方法进行解码
1)由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理。
实例
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//判断ByteBuf的字节数是否大于4个字节
if (in.readableBytes() >= 4) {
out.add(in.readInt());//读取Int型数据添加到List中,自动装箱
}
}
}
这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据。
编码器
MessageToByteEncoder Encoder()
方法进行编码
实例
使用自定义的编码器和解码器来说明Netty的handler 调用机制
- 客户端发送long -> 服务器
- 服务端发送long -> 客户端
服务器
public class MyServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加入站的handler进行解码 MyByteToLongDecoder
pipeline.addLast(new MyByteToLongDecoder());
//出站编码
pipeline.addLast(new MyLongToByteEncoder());
//添加自定义handler
pipeline.addLast(new MyServerHandler());
}
}
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long" + msg);
//给客户端发送一个long
ctx.writeAndFlush(98765L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
编码器
继承MessageToByteEncoder
,重写encode()
编码方法
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
//编码方法
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MessageToByteEncoder encode 被调用");
System.out.println("msg=" + msg);
out.writeLong(msg);
}
}
解码器
继承ByteToMessageDecode
,重写decode()
解码方法
decode 会根据接受的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多到可读字节为止。
如果list out 不为空,就会将list的内容传递给下一个ChannelInBoundHandler处理,该处理器的方法也会被调用多次。
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
*decode 会根据接受的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多到可读字节为止。
* 如果list out 不为空,就会将list的内容传递给下一个ChannelInBoundHandler处理,该处理器的方法也会被调用多次。
*
* @param ctx 上下文对象
* @param in 入站的ByteBuf
* @param out List集合,将解码后的数据传给下一个handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder decode 被调用");
//因为long 8个字节,需要判断有8个字节,才能读取一个long
if (in.readableBytes()>=8){
out.add(in.readLong());
}
}
}
客户端
public class MyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new MyClientInitializer());//自定义初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个出站的handler,对数据进行一个编码。
pipeline.addLast(new MyLongToByteEncoder());
//出站解码
pipeline.addLast(new MyByteToLongDecoder());
//加入一个自定义对handler,处理业务
pipeline.addLast(new MyClientHandler());
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器端IP:"+ctx.channel().remoteAddress());
System.out.println("收到服务器消息:"+msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
ctx.writeAndFlush(123456L);//发送一个long
}
}
测试
我们先来一张图来方便我们分析结果
服务器
(客户端——->服务器):入站。服务器接受客户端发送的数据,解码器首先被调用,接着调用我们自定义的ServerHandler打印数据。
(服务器——->客户端):出站。服务器发送数据给客户端首先调用我们自定义的处理器ServerHandler,接着调用编码器,打印数据。
客户端
(客户端——->服务器):出站。客户端一启动首先调用我们自己编写的ClientHandler发送数据,接着调用编码器编码数据。
(服务器——->客户端):入站。首先使用解码器解码服务器发送的数据,接着传递调用给自定义的ClientHandler。
测试结果与分析结果一致测试成功!
细节
我们修改客户端发送的数据为一个16字节的字符串按UTF-8编码。运行测试。
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器端IP:"+ctx.channel().remoteAddress());
System.out.println("收到服务器消息:"+msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//ctx.writeAndFlush(123456L);//发送一个long
//1."abcdabcdabcdabcd" 是16个字节
//2.该处理器的前一个handler是MyLongToByteEncoder没有进行编码处理 为什么?原因在它的父类 MessageToByteEncoder
//因此我们编写 Encoder 是注意传人的数据类型和处理的数据类型一致
ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));
}
}
16字节每次只读Long类型也就是8字节,所以需要读两次。每次解码过后就交给了ServerHandler处理,也就发送了98765L
从而也调用了两次编码器。
发送字符串数据没有调用Long类型编码器的编码方法。
后面是接受服务器两次发送的数据流程。
不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会执行相应操作。
在解码器 进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能不一致