21.Netty源码之编码器

Netty如何实现自定义通信协议

在学习完如何设计协议之后,我们又该如何在 Netty 中实现自定义的通信协议呢?其实 Netty 作为一个非常优秀的网络通信框架,已经为我们提供了非常丰富的编解码抽象基类,帮助我们更方便地基于这些抽象基类扩展实现自定义协议。

首先我们看下 Netty 中常用的编解码器有哪些。

一次编解码器和二次编解码器

Netty中的编解码器分为一次编解码和二次编解码。

一次编解码器:MessageToByteEncoder、ByteToMessageDecoder/ReplyingDecoder

二次编解码器:MessageToMessageEncoder、MessageToMessageDecoder

以解码为例,一次解码器用于解决TCP拆包/粘包问题,解析得到字节数据。

如果需要对解析后的字节数据做对象转换,需要使用二次解码器。同理,编码器是相反过程。

为什么需要二次码/编码

假设我们把解决半包粘包问题的常用三种解码器叫一次解码器:

image.png

那么我们在项目中,除了可选的的压缩解压缩之外,还需要一层解码,因为一次解码的结果是字节,需要和项目中所使用的对象做转化,方便使用,这层解码器可以称为“二次解码器”。

相应的,对应的二次编码器是为了将 Java 对象转化成字节流方便存储或传输。

为什么不合并一次二次解码器

思考:是不是也可以一步到位? 合并 1 次解码(解决粘包、半包)和 2 次解码(解决可操作问题)

可以,但是不建议: •没有分层,不够清晰;分层可以组合。 •耦合性高,不容易置换方案。

常用的编解码方式

-Java 序列化

-Marshaling

-XML

-JSON

-MessagePack

-Protobuf

-其他

选择编解码方式的因素

-空间:编码后占用空间 -时间:编解码速度 -是否追求可读性 -是否支持多语言,例如msgpack的支持:Java\C\Python等

Protobuf

-Protobuf 是一个灵活的、高效的用于序列化数据的协议。

-相比较 XML 和 JSON 格式,Protobuf 更小、更快、更便捷。

-Protobuf 是跨语言的,并且自带了一个编译器(protoc),只需要用它进行编译,可以自动生成 Java、python、C++ 等代码,不需要再写其他代码。

Protobuf使用步骤

第1步:在Maven 项目中引入 Protobuf 坐标,下载相关的jar包。

在pom.xml中 添加依赖

<dependencies>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.6.1</version>
</dependency>
</dependencies>

第 2 步: 编写proto文件:Student.proto。

Student.proto的内容

syntax = "proto3"; //版本
option java_outer_classname = "StudentPoJO"; //指定生成的Java类名

//内部类的名称,是真正的PoJo 类
message Student{ // message 的规定的
   int32 id = 1; //PoJo 类的属性数据类型类型和 序号(不是属性值)
   string name = 2;
}

第 3 步:通过 protoc.exe 根据描述文件生成 Java 类。

说明:protoc-3.6.1-win32 是从网上下载的 google 提供的文件.

cmd执行命令生成StudentPoJO.java: C:\Users\Administrator\Desktop\Netty资料\我的\资料\protoc-3.6.1-win32\bin>protoc.exe –java_out=. Student.proto

第4步:把生成的 StudentPoJo.java 拷贝到自己的项目中打开。

第 5 步:在 Netty 中使用。

ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(StudentPoJO.Student.getDefaultInstance())); 
​

ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); 
ch.pipeline().addLast(new ProtobufEncoder());

抽象编码类

所有的解码器都继承了ChannelInBoundHandler。因为解码是需要解码接收的数据。所以使用In。

所有的编码器都继承了ChannelOutBoundHandler。因为编码是需要将对外发送的数据编码。所以使用Out。

image.png

通过抽象编码类的继承图可以看出编码类是 ChanneOutboundHandler 的抽象类实现,具体操作的是 Outbound 出站数据。

常用编码器类型

  • MessageToByteEncoder 对象编码成字节流;
  • MessageToMessageEncoder 一种对象消息类型编码成另外一种对象消息类型。

使用一次编码器IntegerEncoder和二次编码器IntegerToStringEncoder,将消息从Integer编码为String。

class IntegerEncoder extends MessageToByteEncoder<Integer> {
    @Override

    public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
        out.writeInt(msg);
    }

}

​

class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
    @Override


    public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out) throws Exception {
        out.add(message.toString());
    }



}


使用一次编码器StringEncoder和二次编码器StringToIntegerEncoder,将消息从String编码为Integer。

class StringEncoder extends MessageToByteEncoder<String> {
    @Override

    public void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        out.writeCharSequence(msg, Charset.defaultCharset());
    }

}

​

class StringToIntegerEncoder extends MessageToMessageEncoder<String> {
    @Override


    public void encode(ChannelHandlerContext ctx, String message, List<Object> out) throws Exception {
        out.add(Integer.parseInt(message));
    }



}


编码器MessageToByteEncoder

MessageToByteEncoder用于将对象编码成字节流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我们需要实现encode 方法即可完成自定义编码。那么encode() 方法是在什么时候被调用的呢?

我们一起看下MessageToByteEncoder 的核心源码片段,如下所示。

MessageToByteEncoder继承自ChannelOutboundHandlerAdapter。

ChannelOutboundHandlerAdapter 实现了 ChannelOutboundHandler接口,重写了write方法。

这里使用了模板模式:encode方法交给具体的子类实现。

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
​

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 1.消息类型是否匹配 不匹配不会处理   
        // 即传入的是String
        if (acceptOutboundMessage(msg)) { 
            //I是泛型
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            // 2. 分配 ByteBuf 资源
            buf = allocateBuffer(ctx, cast, preferDirect); 
            try {
            // 3. 执行 encode 方法完成数据编码
                encode(ctx, cast, buf); 
            } finally {
                ReferenceCountUtil.release(cast);
            }
            if (buf.isReadable()) {
            // 4. 向后传递写事件
                ctx.write(buf, promise); 
            } else {

                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}
}
//供子类重写
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其主要逻辑分为以下几个步骤:

  1. acceptOutboundMessage 判断是否有匹配的消息类型,如果匹配需要执行编码流程,如果不匹配直接继续传递给下一个 ChannelOutboundHandler。
  2. 分配 ByteBuf 资源,默认使用堆外内存。
  3. 调用子类实现的 encode 方法完成数据编码,一旦消息被成功编码,会通过调用ReferenceCountUtil.release(cast) 自动释放。
  4. 如果 ByteBuf 可读,说明已经成功编码得到数据,然后写入 ChannelHandlerContext 交到下一个节点。如果 ByteBuf 不可读,则释放 ByteBuf 资源,向下传递空的 ByteBuf 对象。

实现类:StringToByteEncoder

编码器实现非常简单,不需要关注拆包/粘包问题。

如下例子,展示了如何将字符串类型的数据写入到 ByteBuf 实例,ByteBuf 实例将传递给 ChannelPipeline 链表中的下一个 ChannelOutboundHandler。

package io.netty.example.Encode;
​

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
​

public class StringToByteEncoder extends MessageToByteEncoder<String> {
​
    @Override


    protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {
      byteBuf.writeBytes(data.getBytes());
    }



}


编码器MessageToMessageEncoder

www.javajike.com/book/essent…

MessageToMessageEncoder 与 MessageToByteEncoder 类似,同样只需要实现 encode 方法。

与 MessageToByteEncoder 不同的是,MessageToMessageEncoder 是将一种格式的消息转换为另外一种格式的消息。

其中第二个 Message 所指的可以是任意一个对象,如果该对象是 ByteBuf 类型,那么和 MessageToByteEncoder 的实现原理是一致的。

MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter。

ChannelOutboundHandlerAdapter 实现了 ChannelOutboundHandler接口,重写了write方法。

这里使用了模板模式:encode方法交给具体的子类实现。

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {
            // 1. 消息类型是否匹配 不匹配不会处理  
            // 即传入的是String
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                 //I是泛型
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    //执行子类encode完成具体编码操作
                    encode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
                //如果输出结果是对象列表out是空
                if (out.isEmpty()) {
                    out.recycle();
                    out = null;
                    throw new EncoderException(" must produce at least one message.");
                }
            } else {

                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
            if (out != null) {
                final int sizeMinusOne = out.size() - 1;
                // sizeMinusOne等于0说明 out长度是1
                if (sizeMinusOne == 0) {
                    //写出去
                    ctx.write(out.getUnsafe(0), promise);
                } else if (sizeMinusOne > 0) {
                    //遍历写出去
                    if (promise == ctx.voidPromise()) {
                        writeVoidPromise(ctx, out);
                    } else {
                        writePromiseCombiner(ctx, out, promise);
                    }
                }
                //回收
                out.recycle();
            }
        }
    }

此外 MessageToByteEncoder 的输出结果是对象列表out,编码后的结果属于中间对象,最终仍然会转化成 ByteBuf 进行传输。

MessageToMessageEncoder 常用的实现子类有 StringEncoder、LineEncoder、Base64Encoder 等。以 StringEncoder 为例看下 MessageToMessageEncoder 的用法。

实现类:StringEncoder

@Sharable
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
​

    // TODO Use CharsetEncoder instead.
    private final Charset charset;
​

    /**
     * Creates a new instance with the current system character set.
     */
    public StringEncoder() {
        this(Charset.defaultCharset());
    }



​
    /**
     * Creates a new instance with the specified character set.
     */
    public StringEncoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }
​
    @Override
    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
        if (msg.length() == 0) {
            return;
        }
        //编码以后加入out列表 由父类写出即可
        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
    }
}
​

思考:现在有1个Java对象要编码为json字符串后转换为byte传输 如何做呢?

1.继承MessageToMessageEncoder

2.重写encode方法

3.将对象序列化为json

4.将json转为ByteBuffer发送:ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(json), charset)

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYb2diSs' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片