Netty入门案例及分析

Netty快速入门实例 TCP 服务

Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”

服务器可以回复消息给客户端 “hello, 客户端~”

首先导入依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.53.Final</version>
        </dependency>

服务端

image-20201108170611983

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //创建BoosGroup和WorkerGroup
        NioEventLoopGroup boosGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();




        try {
            //创建服务器端启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程链进行设置
            bootstrap.group(boosGroup, workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    //.handler(null)//该handler对应bossGroup,childHandler对应workerGroup
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //可以使用一个集合管理SocketChannel,再推送消息时,可以将业务加入到各个channel对应到NIOEventLoop到taskQueue或者scheduleTaskQueue
                            System.out.println("客户SocketChannel hashcode="+socketChannel.hashCode());
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });//给我们的workerGroup的EventLoop对应的管道设置处理器

            System.out.println("......服务器 is ready...");
            //启动服务器并绑定一个端口并且同步,生成了一个ChannelFuture对象
            ChannelFuture cf = bootstrap.bind(6688).sync();


            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

image-20201108171806580

我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范),这时我们自定义一个Handler,才能称为一个handler。

  • channelRead()方法读取客户端发送的消息
  • channelReadComplete()方法数据读取完成
  • exceptionCaught()方法处理异常,一般是需要关闭通道
/**


 * 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)


 * 2.这时我们自定义一个Handler,才能称为一个handler


 */


public class NettyServerHandler extends ChannelInboundHandlerAdapter {






    //读取数据实际(这里我们可以读取客户端发送的消息)


    //1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址


    //2.Object msg: 就是客户端发送的数据 默认Object


    @Override


    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


        //将msg转成一个ByteBuf
        //ByteBuf是Netty提供的,不是NIO的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
    }



    //数据读取完成
    @Override

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
    }


    //处理异常,一般是需要关闭通道

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

客户端

image-20201108172548785

public class NettyClient {


    public static void main(String[] args) throws InterruptedException {

        //客户端需要一个事件循环组
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();

        try {
            //创建客户端启动对象Bootstrap 而不是服务端使用的ServerBootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(eventExecutors)//设置线程组
            .channel(NioSocketChannel.class)//设置客户端通道的实现类(反射)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
                }
            });


            System.out.println("客户端 ok..");

            //启动客户端去连接服务端
            //关于ChannelFuture要分析,涉及到netty到异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }

}

image-20201108172633372

  • channelActive()方法:当通道就绪就会触发该方法
  • channelRead()方法:当通道有读取事件时,会触发
  • exceptionCaught()方法:出现异常执行
public class NettyClientHandler extends ChannelInboundHandlerAdapter {


    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server", CharsetUtil.UTF_8));
    }

    //当通道有读取事件时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址:"+ctx.channel().remoteAddress());

    }



    //处理异常
    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.channel();
    }

}

image-20201108172846775

image-20201108172855031

实例分析

BossGroup和WorkerGroup是怎么确定下面含有多少个子线程NioEventLoop的??

BossGroup和BorkerGroup含有的子线程(NioEventLoop)的个数 默认实际 cpu核数 * 2

image-20201108174102996

一直往下追,到最后会看见如下源码

image-20201108174221044

super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);等于0时为DEFAULT_EVENT_LOOP_THREADS

image-20201108174348571

我们可以通过该代码System.out.println(Runtime.getRuntime().availableProcessors());查看自己电脑的CPU核数。

image-20201108174540695

我的电脑CPU核数为8。所以默认的NioEventLoop个数为16。我们也可以通过Debug来验证。

image-20201108174726184

image-20201108174744466

我们把boosGroup设置为1,workerGroup设置为8。服务端查看客户端使用的线程号。

image-20201108192015195

/**


 * 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)


 * 2.这时我们自定义一个Handler,才能称为一个handler


 */


public class NettyServerHandler extends ChannelInboundHandlerAdapter {






    //读取数据实际(这里我们可以读取客户端发送的消息)


    //1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址


    //2.Object msg: 就是客户端发送的数据 默认Object


    @Override


    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {




        System.out.println("服务器读取线程"+Thread.currentThread().getName());

        System.out.println("server ctx = "+ ctx);

        //将msg转成一个ByteBuf
        //ByteBuf是Netty提供的,不是NIO的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
    }


    //数据读取完成
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
    }

    //处理异常,一般是需要关闭通道


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

允许并行运行Allow parallel run

image-20201108192238610

接着启动服务端,和9个客户端。查看结果。

image-20201108192212012

依次使用(类似轮询)

ChannelHandlerContext ctx对象又包含什么呢??pipelinechannel 的关系是什么呢?

ctx.channel();获取channel

ctx.pipeline();获取pipeline

image-20201108193902593

/**


 * 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)


 * 2.这时我们自定义一个Handler,才能称为一个handler


 */


public class NettyServerHandler extends ChannelInboundHandlerAdapter {






    //读取数据实际(这里我们可以读取客户端发送的消息)


    //1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址


    //2.Object msg: 就是客户端发送的数据 默认Object


    @Override


    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {




        System.out.println("服务器读取线程"+Thread.currentThread().getName());

        System.out.println("server ctx = "+ ctx);

        System.out.println("看看channel和pipeline的关系");
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline();//本质是一个双向链表,出站入站
        //将msg转成一个ByteBuf
        //ByteBuf是Netty提供的,不是NIO的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
    }

    //数据读取完成
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
    }

    //处理异常,一般是需要关闭通道

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Debug运行查看运行结果。

image-20201108194619905

image-20201108194746009

image-20201108194858443

ctx包含了许多信息,同时包含了pipeline和channel。

pipeline中包含了channel,channel中包含了pipeline。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY3vtytI' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片