Netty编解码器和handler的调用机制

handler基本说明

Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等

其中ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。

例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。

ChannelPipeline提供了ChannelHandler链的容器

以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的。同理,对于服务端来说,也是一样的,有数据来就是入站,有数据输出就是出站。

image-20201123171017623

为什么服务端和客户端的handler都是继承SimpleChannelInboundHandler,而没有ChannelOutboundHandler出站类?

实际上当我们在handler中调用ctx.writeAndFlush()方法后,就会将数据交给ChannelOutboundHandler进行出站处理,只是我们没有去定义出站类而已,若有需求可以自己去实现ChannelOutboundHandler出站类。

总结就是客户端和服务端都有出站和入站的操作,形成一个循环链

服务端-------->出站-------->入站-------->客户端 -------->出站-------->入站-------->服务端

编解码器

当Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。

Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。

解码器

ByteToMessageDecoder decode()方法进行解码

image-20201123172449318

1)由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理。

实例

public class ToIntegerDecoder extends ByteToMessageDecoder {
   @Override
   protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
     	//判断ByteBuf的字节数是否大于4个字节
       if (in.readableBytes() >= 4) {
           out.add(in.readInt());//读取Int型数据添加到List中,自动装箱
       }
   }
}

这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据。

编码器

MessageToByteEncoder Encoder()方法进行编码

image-20201123172403567

image-20201123173540073

实例

使用自定义的编码器和解码器来说明Netty的handler 调用机制

  • 客户端发送long -> 服务器
  • 服务端发送long -> 客户端

服务器

image-20201123202153430

public class MyServer {









    public static void main(String[] args) throws InterruptedException {

        NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);


        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boosGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer());

            ChannelFuture channelFuture = bootstrap.bind(7000).sync();




            channelFuture.channel().closeFuture().sync();


        } finally {

            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }


    }

}

image-20201123202618613

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {









    @Override


    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();


        //添加入站的handler进行解码 MyByteToLongDecoder
        pipeline.addLast(new MyByteToLongDecoder());
        //出站编码
        pipeline.addLast(new MyLongToByteEncoder());
        //添加自定义handler
        pipeline.addLast(new MyServerHandler());


    }

}

image-20201123203702147

public class MyServerHandler extends SimpleChannelInboundHandler<Long> {











    @Override

    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long" + msg);


        //给客户端发送一个long
        ctx.writeAndFlush(98765L);
    }


    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

编码器

继承MessageToByteEncoder,重写encode()编码方法

image-20201123204916471

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {









    //编码方法
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        System.out.println("MessageToByteEncoder encode 被调用");
        System.out.println("msg=" + msg);
        out.writeLong(msg);
    }
}

解码器

继承ByteToMessageDecode,重写decode()解码方法

image-20201123203915543

decode 会根据接受的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多到可读字节为止。

如果list out 不为空,就会将list的内容传递给下一个ChannelInBoundHandler处理,该处理器的方法也会被调用多次。

public class MyByteToLongDecoder extends ByteToMessageDecoder {









    /**
     *decode 会根据接受的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多到可读字节为止。
     * 如果list out 不为空,就会将list的内容传递给下一个ChannelInBoundHandler处理,该处理器的方法也会被调用多次。
     *
     * @param ctx 上下文对象
     * @param in  入站的ByteBuf
     * @param out List集合,将解码后的数据传给下一个handler
     * @throws Exception
     */
    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {




        System.out.println("MyByteToLongDecoder decode 被调用");
        //因为long 8个字节,需要判断有8个字节,才能读取一个long
        if (in.readableBytes()>=8){
            out.add(in.readLong());
        }

    }

}

客户端

image-20201123203957557

public class MyClient {









    public static void main(String[] args) throws InterruptedException {

        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer());//自定义初始化类


            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();


            channelFuture.channel().closeFuture().sync();



        } finally {

            group.shutdownGracefully();
        }




    }

}

image-20201123204147327

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {











    @Override

    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();


        //加入一个出站的handler,对数据进行一个编码。
        pipeline.addLast(new MyLongToByteEncoder());
        //出站解码
        pipeline.addLast(new MyByteToLongDecoder());
        //加入一个自定义对handler,处理业务
        pipeline.addLast(new MyClientHandler());
    }

}

image-20201123202853744

public class MyClientHandler extends SimpleChannelInboundHandler<Long> {










    @Override


    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

        System.out.println("服务器端IP:"+ctx.channel().remoteAddress());

        System.out.println("收到服务器消息:"+msg);

    }





    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("MyClientHandler 发送数据");

        ctx.writeAndFlush(123456L);//发送一个long




    }
}

测试

我们先来一张图来方便我们分析结果

Handler调用机制

服务器

image-20201123205645627

(客户端——->服务器):入站。服务器接受客户端发送的数据,解码器首先被调用,接着调用我们自定义的ServerHandler打印数据。

(服务器——->客户端):出站。服务器发送数据给客户端首先调用我们自定义的处理器ServerHandler,接着调用编码器,打印数据。

客户端

image-20201123205713335

(客户端——->服务器):出站。客户端一启动首先调用我们自己编写的ClientHandler发送数据,接着调用编码器编码数据。

(服务器——->客户端):入站。首先使用解码器解码服务器发送的数据,接着传递调用给自定义的ClientHandler。

测试结果与分析结果一致测试成功!

细节

我们修改客户端发送的数据为一个16字节的字符串按UTF-8编码。运行测试。

image-20201123213136422

public class MyClientHandler extends SimpleChannelInboundHandler<Long> {










    @Override


    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

        System.out.println("服务器端IP:"+ctx.channel().remoteAddress());

        System.out.println("收到服务器消息:"+msg);

    }





    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("MyClientHandler 发送数据");

        //ctx.writeAndFlush(123456L);//发送一个long




        //1."abcdabcdabcdabcd" 是16个字节
        //2.该处理器的前一个handler是MyLongToByteEncoder没有进行编码处理 为什么?原因在它的父类 MessageToByteEncoder
        //因此我们编写 Encoder 是注意传人的数据类型和处理的数据类型一致
        ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));

    }

}

image-20201123213307154

image-20201123213427984

16字节每次只读Long类型也就是8字节,所以需要读两次。每次解码过后就交给了ServerHandler处理,也就发送了98765L从而也调用了两次编码器。

image-20201123213626062

发送字符串数据没有调用Long类型编码器的编码方法。

后面是接受服务器两次发送的数据流程。

image-20201123213724130

Handler处理细节

不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会执行相应操作。

在解码器 进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能不一致

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY1Nw4mU' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片