netty4-数据读取性能大于处理时“直接内存水位处理器”使用

PS:禁止拷贝形式转载,转载请以URL形式

1 简介

实现程序

Tcp-Client

Udp-Server

Udp-Client

Tcp-Server

背景: 业务需要实现大致如上图场景程序, Udp Server 读取Udp Client 的数据然后通过 Tcp client 发送出去 。

问题: 使用netty4 实现了一版程序,发现会出现下述场景:Udp Server读取出了大量数据但是Tcp client处理速度小于读取,导致数据积压抛出io.netty.util.internal.OutOfDirectMemoryError无法申请直接内存DirectBuf进而无法读取数据,最终UDP Server断开。

结果: 最终借鉴netty 写入时的WRITE_BUFFER_WATER_MARK高低水位限流实现了一个作用在UDP Server读取时我称为直接内存水位处理器DirectMemoryWaterHandler用来处理数据积压的场景

2 参考

超详细图文详解神秘的 Netty 高性能内存管理

深入理解Netty—从偶现宕机看Netty流量控制

3 环境

java:1.8

netty:4.1.90.Final

4 分析

PS:当前我使用的ByteBufAllocator实际为PooledByteBufAllocator(总共就两种池化和非池化,后续例子都是以池化进行解释)

  1. 协议上Udp在设计上无需连接、会话、ACK等额外限制只考虑发或收的情况下UDP读写都必然要高于TCP,所以无程序或业务限制下必然出现上述问题

  2. netty类库读取数据(TCP,UDP都是)强制使用直接内存DirectBuf,写入的话最终也是使用直接内存DirectBuf写入(如果写入是堆内存HeapBuf最终在写入前会复制到直接内存DirectBuf在写入),即读写都是DirectBuf且这个DirectBufByteBufAllocator进行创建管理。

  3. 当前我使用的ByteBufAllocatorPooledByteBufAllocator,基本当前程序不去额外配置的话只会存在一个ByteBufAllocator什么你配置了多个自己

  4. PooledByteBufAllocator创建管理内存DirectBuf但是最终调用PlatformDependent去创建内存,PooledByteBufAllocator.pinnedDirectMemory()查询当前buf分配器内pinned住的内存,
    PlatformDependent.maxDirectMemory()PlatformDependent.usedDirectMemory()查询当前系统下netty最大直接内存和以用直接内存。
    netty 直接内存DirectBuf再写入socket channel时会被pinned住,即netty 管理的DirectBuf会有如下关系

最大直接内存:maxDirectMemory

已用直接内存:usedDirectMemory

pinned住内存:pinnedDirectMemory

  1. 当前我使用的ByteBufAllocator实际为PooledByteBufAllocator(总共就两种池化和非池化,后续例子都是以池化进行解释),因为池化会缓存DirectBuf且缓存DirectBuf也被统计为已用直接内存,会出现已用内存占最大内存50%但是我们实际可用任是100%,结合我的业务场景使用的DirectBuf都是写入socket 中即会pinned住,所以后续判断是否触发水位线时使用已用直接内存usedDirectMemory,判断是否解除水位线时使用pinnedDirectMemory

5 实现

其实根据上述分析,实现重点

  1. 创建直接内存水位处理器DirectMemoryWaterHandler,设置大小水位线条
  2. 每次UDP Server UdpServerHandler读取数据时,由直接内存水位处理器DirectMemoryWaterHandler判断当前实际使用直接内存大小进行判断是否触发水位,根据状态进行相应数据处理(当前演示代码处理实现为丢弃)

DirectMemoryWaterHandler:直接内存水位处理器

import io.netty.buffer.PooledByteBufAllocator;

import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.PlatformDependent;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @ClassName
 * @Description
 * @Author dyf
 * @Date 2023/6/14
 * @Version 1.0
 */
@Slf4j
public class DirectMemoryWaterHandler {

    //低水位线百分比:当前固定为0.2
    private final float lowWaterMark = 0.2F;
    //高水位线百分比:当前固定为0.95
    private final float highWaterMark = 0.95F;
    private final PooledByteBufAllocator pooledByteBufAllocator;
    private final AtomicBoolean waterState = new AtomicBoolean(false);

    public DirectMemoryWaterHandler(PooledByteBufAllocator pooledByteBufAllocator) {
        this.pooledByteBufAllocator = pooledByteBufAllocator;
    }

    public boolean handle(DatagramPacket packet){
        //当前netty 最大直接内存大小
        long max = PlatformDependent.maxDirectMemory();
        //当前netty 以用直接内存大小
        long use = PlatformDependent.usedDirectMemory();
        double percentage = use / (double) max;

        //水位限制
        if (waterState.get()) {
            //处于水位控制中,当前演示代码处理实现为丢弃
            packet.release();
            //为了处理PooledByteBufAllocator,会缓存直接内存导致usedDirectMemory不是实际使用内存
            //结合当前业务场景使用pinnedDirectMemory这个值更符合实际场景,当然该值会比正常的实际使用usedDirectMemory小一点,但是我当前业务场景可以忽略
            percentage = pooledByteBufAllocator.pinnedDirectMemory() / (double) max;
            if (percentage < lowWaterMark) {
                waterState.set(false);
                log.warn("UDP Server 通道水位限制解除,当前水位[{}%-{}] 限制水位[{}%] 最大容量:[{}]", percentage, use, highWaterMark, max);
            }
            return false;
        } else if (percentage > highWaterMark) {
            //处于水位控制中,当前演示代码处理实现为丢弃
            packet.release();
            waterState.set(true);
            log.warn("UDP Server 通道水位限制生效,当前水位[{}%-{}] 限制水位[{}%] 最大容量:[{}]", percentage, use, highWaterMark, max);
            return false;
        }
        return true;
    }

}

UdpServerHandler:UDP读取时限流使用

import io.netty.buffer.PooledByteBufAllocator;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;

/**
 * @ClassName
 * @Description
 * @Author dyf
 * @Date 2023/5/6
 * @Version 1.0
 */
@Slf4j
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    
    private DirectMemoryWaterHandler handler;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        handler = new DirectMemoryWaterHandler((PooledByteBufAllocator) ctx.alloc());
    }
    
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception {
        if (handler.handle(packet)){
            //业务处理:比如当前我的业务场景TCP客户端发送数据
        }
    }
}

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYz6kUEd' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片