Netty快速入门实例 TCP 服务
Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
服务器可以回复消息给客户端 “hello, 客户端~”
首先导入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.53.Final</version>
</dependency>
服务端
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BoosGroup和WorkerGroup
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程链进行设置
bootstrap.group(boosGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
//.handler(null)//该handler对应bossGroup,childHandler对应workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//可以使用一个集合管理SocketChannel,再推送消息时,可以将业务加入到各个channel对应到NIOEventLoop到taskQueue或者scheduleTaskQueue
System.out.println("客户SocketChannel hashcode="+socketChannel.hashCode());
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们的workerGroup的EventLoop对应的管道设置处理器
System.out.println("......服务器 is ready...");
//启动服务器并绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6688).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范),这时我们自定义一个Handler,才能称为一个handler。
channelRead()
方法读取客户端发送的消息channelReadComplete()
方法数据读取完成exceptionCaught()
方法处理异常,一般是需要关闭通道
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将msg转成一个ByteBuf
//ByteBuf是Netty提供的,不是NIO的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}
//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}
//处理异常,一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建客户端启动对象Bootstrap 而不是服务端使用的ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务端
//关于ChannelFuture要分析,涉及到netty到异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
channelActive()
方法:当通道就绪就会触发该方法channelRead()
方法:当通道有读取事件时,会触发exceptionCaught()
方法:出现异常执行
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server", CharsetUtil.UTF_8));
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
//处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel();
}
}
实例分析
BossGroup和WorkerGroup是怎么确定下面含有多少个子线程NioEventLoop的??
BossGroup和BorkerGroup含有的子线程(NioEventLoop)的个数 默认实际 cpu核数 * 2
一直往下追,到最后会看见如下源码
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
等于0时为DEFAULT_EVENT_LOOP_THREADS
我们可以通过该代码System.out.println(Runtime.getRuntime().availableProcessors());
查看自己电脑的CPU核数。
我的电脑CPU核数为8。所以默认的NioEventLoop个数为16。我们也可以通过Debug来验证。
我们把boosGroup设置为1,workerGroup设置为8。服务端查看客户端使用的线程号。
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程"+Thread.currentThread().getName());
System.out.println("server ctx = "+ ctx);
//将msg转成一个ByteBuf
//ByteBuf是Netty提供的,不是NIO的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}
//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}
//处理异常,一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
允许并行运行Allow parallel run
接着启动服务端,和9个客户端。查看结果。
依次使用(类似轮询)
ChannelHandlerContext
ctx对象又包含什么呢??pipeline
和channel
的关系是什么呢?
ctx.channel();
获取channel
ctx.pipeline();
获取pipeline
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程"+Thread.currentThread().getName());
System.out.println("server ctx = "+ ctx);
System.out.println("看看channel和pipeline的关系");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();//本质是一个双向链表,出站入站
//将msg转成一个ByteBuf
//ByteBuf是Netty提供的,不是NIO的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}
//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}
//处理异常,一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Debug运行查看运行结果。
ctx包含了许多信息,同时包含了pipeline和channel。
pipeline中包含了channel,channel中包含了pipeline。