编码和解码的基本介绍
编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码。
codec(编解码器) 的组成部分有两个:
- decoder(解码器)和 encoder(编码器)。
- encoder 负责把业务数据转换成字节码数据
- decoder 负责把字节码数据转换成业务数据
Netty 自身提供了一些 codec(编解码器)
Netty 提供的编码器
-
StringEncoder,对字符串数据进行编码
-
ObjectEncoder,对 Java 对象进行编码
……
Netty 提供的解码器
-
StringDecoder, 对字符串数据进行解码
-
ObjectDecoder,对 Java 对象进行解码
……
但是Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题
- 无法跨语言
- 序列化后的体积太大,是二进制编码的 5 倍多。
- 序列化性能太低
引出 新的解决方案 [Google 的 Protobuf
]
Protobuf
Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用remote procedure call ] 数据交换格式
目前很多公司http+json--->tcp+protobuf
语言指南developers.google.com/protocol-bu…
Protobuf 是以message
的方式来管理数据的.
支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝大多数语言,例如 C++、C#、Java、python 等),高性能,高可靠性。
使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述。说明,在idea 中编写 .proto 文件时,会自动提示是否下载 .ptotot 编写插件. 可以让语法高亮。(考察)
然后通过protoc.exe
编译器根据.proto 自动生成.java 文件
单对象案例
客户端可以发送一个Student PoJo 对象到服务器 (通过 Protobuf 编码)
服务端能接收Student PoJo 对象,并显示信息(通过 Protobuf 解码
环境搭建
首先Idea下载插件Protocol Buffer Editor
下载protochttps://github.com/protocolbuffers/protobuf/releases?after=v3.7.1解压
导入依赖
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
代码编写
发送对象
Student.proto
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
string name = 2;
}
写完之后将该文件复制到protoc的bin目录下(之所以这样做是没配置环境变量,MAC必须要配置)
进入终端输protoc --java_out=. Student.proto
得到.java文件
复制到项目中,查看。
客服端
socketChannel.pipeline().addLast("encoder", new ProtobufEncoder());
在pipeline中加入ProtoBufEncoder
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建客户端启动对象Bootstrap 而不是服务端使用的ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipeline中加入ProtoBufEncoder
socketChannel.pipeline().addLast("encoder", new ProtobufEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务端
//关于ChannelFuture要分析,涉及到netty到异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("豹子头 林冲").build();
通过内部类构造Student对象
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发生一个Student对象到服务器
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("豹子头 林冲").build();
ctx.writeAndFlush(student);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
//处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel();
}
}
服务器
socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
添加Protobuf解码器指定对Student对象解码。
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BoosGroup和WorkerGroup
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程链进行设置
bootstrap.group(boosGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
//.handler(null)//该handler对应bossGroup,childHandler对应workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipeline中加入ProtobufDecoder 指定对Student对象解码
socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们的workerGroup的EventLoop对应的管道设置处理器
System.out.println("......服务器 is ready...");
//启动服务器并绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6688).sync();
//给cf注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6688 成功");
} else {
System.out.println("监听端口 6688 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
//读取从客户端发发送端StudentPojo.Student
System.out.println("客户端发送端数据 id=" + msg.getId() + " 名字=" + msg.getName());
}
//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}
//处理异常,一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
测试
启动服务器和客户端,接受成功。
多对象案例
1)客户端可以随机发送Student PoJo/ Worker PoJo 对象到服务器 (通过 Protobuf 编码)
2)服务端能接收Student PoJo/ Worker PoJo 对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)
syntax = "proto3";//版本
option optimize_for = SPEED;//加快解析
option java_package = "com.kylin.netty.codec2";//指定生成到哪个包下
option java_outer_classname = "MyDataInfo";//外部类名
//protobuf 可以使用message 管理其他的message
message MyMessage{
//定义一个枚举类型
enum DataType{
StudentType = 0;//在proto3要求enum的编号从0开始
WorkerType = 1;
}
//用data_type来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中的一个,节省空间
oneof dataBody{
Student student = 2;
Worker worker = 3;
}
}
message Student{
int32 id = 1;//Student类的属性
string name = 2;
}
message Worker{
string name = 1;
int32 age = 2;
}
同样在bin目录下protoc --java_out=. Student.proto
复制到项目目录中并打开。
客户端
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建客户端启动对象Bootstrap 而不是服务端使用的ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipeline中加入ProtoBufEncoder
socketChannel.pipeline().addLast("encoder", new ProtobufEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务端
//关于ChannelFuture要分析,涉及到netty到异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机发送Student或Worker对象
int random = new Random().nextInt(3);
MyDataInfo.MyMessage message = null;
if (0 == random) {//发送student对象
message = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
.setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build())
.build();
} else {
message = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
.setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("法外狂徒 张三").build())
.build();
}
ctx.writeAndFlush(message);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}
//处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel();
}
}
服务器
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BoosGroup和WorkerGroup
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程链进行设置
bootstrap.group(boosGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
//.handler(null)//该handler对应bossGroup,childHandler对应workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipeline中加入ProtobufDecoder
socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们的workerGroup的EventLoop对应的管道设置处理器
System.out.println("......服务器 is ready...");
//启动服务器并绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6688).sync();
//给cf注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6688 成功");
} else {
System.out.println("监听端口 6688 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
//读取从客户端发送的数据
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
//发送的是学生数据
if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
System.out.println("[学生] " + "ID=" + msg.getStudent().getId() + " Name=" + msg.getStudent().getName());
} else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {//发送的是工人数据
System.out.println("[工人] " + "Age=" + msg.getWorker().getAge() + " Name=" + msg.getWorker().getName());
}
}
//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}
//处理异常,一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
测试
运行一个服务器运行多个客户端查看结果
测试成功~